diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index 1f5d8b506..2582bf206 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -6,6 +6,7 @@ from configmanager import Config import aleph.config +from aleph.chains.chain_service import ChainService from aleph.db.connection import make_engine, make_session_factory from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService @@ -21,7 +22,10 @@ APP_STATE_NODE_CACHE, APP_STATE_P2P_CLIENT, APP_STATE_SESSION_FACTORY, - APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, APP_STATE_MQ_WS_CHANNEL, + APP_STATE_STORAGE_SERVICE, + APP_STATE_MQ_CHANNEL, + APP_STATE_MQ_WS_CHANNEL, + APP_STATE_CHAIN_SERVICE, ) @@ -49,6 +53,9 @@ async def configure_aiohttp_app( ipfs_service=ipfs_service, node_cache=node_cache, ) + chain_service = ChainService( + storage_service=storage_service, session_factory=session_factory + ) app = create_aiohttp_app() @@ -67,6 +74,7 @@ async def configure_aiohttp_app( app[APP_STATE_NODE_CACHE] = node_cache app[APP_STATE_STORAGE_SERVICE] = storage_service app[APP_STATE_SESSION_FACTORY] = session_factory + app[APP_STATE_CHAIN_SERVICE] = chain_service return app diff --git a/src/aleph/schemas/pending_messages.py b/src/aleph/schemas/pending_messages.py index 0665c302c..e8203eaf0 100644 --- a/src/aleph/schemas/pending_messages.py +++ b/src/aleph/schemas/pending_messages.py @@ -124,6 +124,11 @@ class PendingStoreMessage(BasePendingMessage[Literal[MessageType.store], StoreCo pass +class PendingInlineStoreMessage(PendingStoreMessage): + item_content: str + item_type: Literal[ItemType.inline] # type: ignore[valid-type] + + MESSAGE_TYPE_TO_CLASS = { MessageType.aggregate: PendingAggregateMessage, MessageType.forget: PendingForgetMessage, diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 39352d85d..983d94366 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -5,6 +5,7 @@ import logging from hashlib import sha256 from typing import Any, IO, Optional, cast, Final +from aiohttp import web from aleph_message.models import ItemType @@ -19,9 +20,13 @@ from aleph.services.ipfs.common import get_cid_version from aleph.services.p2p.http import request_hash as p2p_http_request_hash from aleph.services.storage.engine import StorageEngine +from aleph.toolkit.constants import MiB from aleph.types.db_session import DbSession from aleph.types.files import FileType from aleph.utils import get_sha256 +from aleph.schemas.pending_messages import ( + parse_message, +) LOGGER = logging.getLogger(__name__) @@ -239,7 +244,9 @@ async def get_json( async def pin_hash(self, chash: str, timeout: int = 30, tries: int = 1): await self.ipfs_service.pin_add(cid=chash, timeout=timeout, tries=tries) - async def add_json(self, session: DbSession, value: Any, engine: ItemType = ItemType.ipfs) -> str: + async def add_json( + self, session: DbSession, value: Any, engine: ItemType = ItemType.ipfs + ) -> str: content = aleph_json.dumps(value) if engine == ItemType.ipfs: @@ -259,6 +266,17 @@ async def add_json(self, session: DbSession, value: Any, engine: ItemType = Item return chash + async def add_file_content_to_local_storage( + self, session: DbSession, file_content: bytes, file_hash: str + ) -> None: + await self.storage_engine.write(filename=file_hash, content=file_content) + upsert_file( + session=session, + file_hash=file_hash, + size=len(file_content), + file_type=FileType.FILE, + ) + async def add_file( self, session: DbSession, fileobject: IO, engine: ItemType = ItemType.ipfs ) -> str: @@ -275,12 +293,8 @@ async def add_file( else: raise ValueError(f"Unsupported item type: {engine}") - await self.storage_engine.write(filename=file_hash, content=file_content) - upsert_file( - session=session, - file_hash=file_hash, - size=len(file_content), - file_type=FileType.FILE, + await self.add_file_content_to_local_storage( + session=session, file_content=file_content, file_hash=file_hash ) return file_hash diff --git a/src/aleph/web/controllers/app_state_getters.py b/src/aleph/web/controllers/app_state_getters.py index d6a25b9bf..0b4b3d65b 100644 --- a/src/aleph/web/controllers/app_state_getters.py +++ b/src/aleph/web/controllers/app_state_getters.py @@ -11,6 +11,7 @@ from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config +from aleph.chains.chain_service import ChainService from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService from aleph.storage import StorageService @@ -27,7 +28,7 @@ APP_STATE_P2P_CLIENT = "p2p_client" APP_STATE_SESSION_FACTORY = "session_factory" APP_STATE_STORAGE_SERVICE = "storage_service" - +APP_STATE_CHAIN_SERVICE = "chain_service" T = TypeVar("T") @@ -103,3 +104,7 @@ def get_session_factory_from_request(request: web.Request) -> DbSessionFactory: def get_storage_service_from_request(request: web.Request) -> StorageService: return cast(StorageService, request.app[APP_STATE_STORAGE_SERVICE]) + + +def get_chain_service_from_request(request: web.Request) -> ChainService: + return cast(ChainService, request.app[APP_STATE_CHAIN_SERVICE]) diff --git a/src/aleph/web/controllers/ipfs.py b/src/aleph/web/controllers/ipfs.py index 425aae0a6..5506cf102 100644 --- a/src/aleph/web/controllers/ipfs.py +++ b/src/aleph/web/controllers/ipfs.py @@ -8,7 +8,7 @@ get_ipfs_service_from_request, get_session_factory_from_request, ) -from aleph.web.controllers.utils import multidict_proxy_to_io +from aleph.web.controllers.utils import file_field_to_io async def ipfs_add_file(request: web.Request): @@ -20,7 +20,12 @@ async def ipfs_add_file(request: web.Request): # No need to pin it here anymore. post = await request.post() - ipfs_add_response = await ipfs_service.add_file(multidict_proxy_to_io(post)) + try: + file_field = post["file"] + except KeyError: + raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.") + + ipfs_add_response = await ipfs_service.add_file(file_field_to_io(file_field)) cid = ipfs_add_response["Hash"] name = ipfs_add_response["Name"] diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 6a33bb5ba..4bbb54ac6 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -1,57 +1,32 @@ import asyncio import json import logging -from typing import Dict, cast, Optional, Any, Mapping, List, Union +from typing import Dict, cast, Optional, Any, List, Union -import aio_pika.abc from aiohttp import web from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config from pydantic import BaseModel, Field, ValidationError -import aleph.toolkit.json as aleph_json -from aleph.schemas.pending_messages import parse_message, BasePendingMessage from aleph.services.ipfs import IpfsService from aleph.services.p2p.pubsub import publish as pub_p2p from aleph.toolkit.shield import shielded -from aleph.types.message_status import ( - InvalidMessageException, - MessageStatus, - MessageProcessingStatus, -) from aleph.types.protocol import Protocol from aleph.web.controllers.app_state_getters import ( get_config_from_request, get_ipfs_service_from_request, get_p2p_client_from_request, - get_mq_channel_from_request, ) -from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue +from aleph.web.controllers.utils import ( + validate_message_dict, + broadcast_and_process_message, + PublicationStatus, + broadcast_status_to_http_status, +) LOGGER = logging.getLogger(__name__) -class PublicationStatus(BaseModel): - status: str - failed: List[Protocol] - - @classmethod - def from_failures(cls, failed_publications: List[Protocol]): - status = { - 0: "success", - 1: "warning", - 2: "error", - }[len(failed_publications)] - return cls(status=status, failed=failed_publications) - - -def _validate_message_dict(message_dict: Mapping[str, Any]) -> BasePendingMessage: - try: - return parse_message(message_dict) - except InvalidMessageException as e: - raise web.HTTPUnprocessableEntity(body=str(e)) - - def _validate_request_data(config: Config, request_data: Dict) -> None: """ Validates the content of a JSON pubsub message depending on the channel @@ -83,7 +58,7 @@ def _validate_request_data(config: Config, request_data: Dict) -> None: reason="'data': must be deserializable as JSON." ) - _validate_message_dict(message_dict) + validate_message_dict(message_dict) async def _pub_on_p2p_topics( @@ -142,48 +117,11 @@ async def pub_json(request: web.Request): ) -async def _mq_read_one_message( - mq_queue: aio_pika.abc.AbstractQueue, timeout: float -) -> Optional[aio_pika.abc.AbstractIncomingMessage]: - """ - Consume one element from a message queue and then return. - """ - - queue: asyncio.Queue = asyncio.Queue() - - async def _process_message(message: aio_pika.abc.AbstractMessage): - await queue.put(message) - - consumer_tag = await mq_queue.consume(_process_message, no_ack=True) - - try: - return await asyncio.wait_for(queue.get(), timeout) - except asyncio.TimeoutError: - return None - finally: - await mq_queue.cancel(consumer_tag) - - -def _processing_status_to_http_status(status: MessageProcessingStatus) -> int: - mapping = { - MessageProcessingStatus.PROCESSED_NEW_MESSAGE: 200, - MessageProcessingStatus.PROCESSED_CONFIRMATION: 200, - MessageProcessingStatus.FAILED_WILL_RETRY: 202, - MessageProcessingStatus.FAILED_REJECTED: 422, - } - return mapping[status] - - class PubMessageRequest(BaseModel): sync: bool = False message_dict: Dict[str, Any] = Field(alias="message") -class PubMessageResponse(BaseModel): - publication_status: PublicationStatus - message_status: Optional[MessageStatus] - - @shielded async def pub_message(request: web.Request): try: @@ -194,76 +132,14 @@ async def pub_message(request: web.Request): # Body must be valid JSON raise web.HTTPUnprocessableEntity() - pending_message = _validate_message_dict(request_data.message_dict) - - # In sync mode, wait for a message processing event. We need to create the queue - # before publishing the message on P2P topics in order to guarantee that the event - # will be picked up. - config = get_config_from_request(request) - - if request_data.sync: - mq_channel = await get_mq_channel_from_request(request=request, logger=LOGGER) - mq_queue = await mq_make_aleph_message_topic_queue( - channel=mq_channel, - config=config, - routing_key=f"*.{pending_message.item_hash}", - ) - else: - mq_queue = None - - # We publish the message on P2P topics early, for 3 reasons: - # 1. Just because this node is unable to process the message does not - # necessarily mean the message is incorrect (ex: bug in a new version). - # 2. If the publication fails after the processing, we end up in a situation where - # a message exists without being propagated to the other nodes, ultimately - # causing sync issues on the network. - # 3. The message is currently fed to this node using the P2P service client - # loopback mechanism. - ipfs_service = get_ipfs_service_from_request(request) - p2p_client = get_p2p_client_from_request(request) - - message_topic = config.aleph.queue_topic.value - failed_publications = await _pub_on_p2p_topics( - p2p_client=p2p_client, - ipfs_service=ipfs_service, - topic=message_topic, - payload=aleph_json.dumps(request_data.message_dict), + pending_message = validate_message_dict(request_data.message_dict) + broadcast_status = await broadcast_and_process_message( + pending_message=pending_message, + message_dict=request_data.message_dict, + sync=request_data.sync, + request=request, + logger=LOGGER, ) - pub_status = PublicationStatus.from_failures(failed_publications) - if pub_status.status == "error": - return web.json_response( - text=PubMessageResponse( - publication_status=pub_status, message_status=None - ).json(), - status=500, - ) - - status = PubMessageResponse( - publication_status=pub_status, message_status=MessageStatus.PENDING - ) - - # When publishing in async mode, just respond with 202 (Accepted). - message_accepted_response = web.json_response(text=status.json(), status=202) - if not request_data.sync: - return message_accepted_response - - # Ignore type checking here, we know that mq_queue is set at this point - assert mq_queue is not None - response = await _mq_read_one_message(mq_queue, timeout=30) - - # Delete the queue immediately - await mq_queue.delete(if_empty=False) - - # If the message was not processed before the timeout, return a 202. - if response is None: - return message_accepted_response - - routing_key = response.routing_key - assert routing_key is not None # again, for type checking - status_str, _item_hash = routing_key.split(".") - processing_status = MessageProcessingStatus(status_str) - status_code = _processing_status_to_http_status(processing_status) - - status.message_status = processing_status.to_message_status() - return web.json_response(text=status.json(), status=status_code) + status_code = broadcast_status_to_http_status(broadcast_status) + return web.json_response(text=broadcast_status.json(), status=status_code) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 8075ba2da..cc16c42a7 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -1,23 +1,50 @@ import base64 import logging +from decimal import Decimal +from typing import Union, Optional, Protocol +import aio_pika +import pydantic from aiohttp import web -from aleph_message.models import ItemType - +from aiohttp.web_request import FileField +from aleph_message.models import ItemType, StoreContent +from mypy.dmypy_server import MiB +from pydantic import ValidationError + +from aleph.chains.chain_service import ChainService +from aleph.db.accessors.balances import get_total_balance +from aleph.db.accessors.cost import get_total_cost_for_address from aleph.db.accessors.files import count_file_pins, get_file from aleph.exceptions import AlephStorageException, UnknownHashError -from aleph.types.db_session import DbSessionFactory, DbSession -from aleph.utils import run_in_executor, item_type_from_hash +from aleph.schemas.pending_messages import ( + BasePendingMessage, + PendingStoreMessage, + PendingInlineStoreMessage, +) +from aleph.storage import StorageService +from aleph.types.db_session import DbSession +from aleph.types.message_status import ( + InvalidSignature, +) +from aleph.utils import run_in_executor, item_type_from_hash, get_sha256 from aleph.web.controllers.app_state_getters import ( get_session_factory_from_request, get_storage_service_from_request, + get_config_from_request, + get_mq_channel_from_request, + get_chain_service_from_request, +) +from aleph.web.controllers.utils import ( + mq_make_aleph_message_topic_queue, + broadcast_and_process_message, + broadcast_status_to_http_status, ) -from aleph.web.controllers.utils import multidict_proxy_to_io logger = logging.getLogger(__name__) - -MAX_FILE_SIZE = 100 * 1024 * 1024 +MAX_FILE_SIZE = 100 * MiB +MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE = 25 * MiB +MAX_UPLOAD_FILE_SIZE = 1000 * MiB async def add_ipfs_json_controller(request: web.Request): @@ -56,22 +83,201 @@ async def add_storage_json_controller(request: web.Request): return web.json_response(output) +async def _verify_message_signature( + pending_message: BasePendingMessage, chain_service: ChainService +) -> None: + try: + await chain_service.verify_signature(pending_message) + except InvalidSignature: + raise web.HTTPForbidden() + + +async def _verify_user_balance(session: DbSession, address: str, size: int) -> None: + current_balance = get_total_balance(session=session, address=address) or Decimal(0) + required_balance = (size / MiB) / 3 + current_cost_for_user = get_total_cost_for_address(session=session, address=address) + if size > 25 * MiB: + if current_balance < (Decimal(required_balance) + current_cost_for_user): + raise web.HTTPPaymentRequired() + + +class StorageMetadata(pydantic.BaseModel): + message: PendingInlineStoreMessage + sync: bool + + +class UploadedFile(Protocol): + @property + def size(self) -> int: + ... + + @property + def content(self) -> Union[str, bytes]: + ... + + +class MultipartUploadedFile: + _content: Optional[bytes] + + def __init__(self, file_field: FileField): + self.file_field = file_field + + try: + content_length_str = file_field.headers["Content-Length"] + self.size = int(content_length_str) + except (KeyError, ValueError): + raise web.HTTPUnprocessableEntity( + reason="Invalid/missing Content-Length header." + ) + self._content = None + + @property + def content(self) -> bytes: + # Only read the stream once + if self._content is None: + self._content = self.file_field.file.read(self.size) + + return self._content + + +class RawUploadedFile: + def __init__(self, content: Union[bytes, str]): + self.content = content + + @property + def size(self) -> int: + return len(self.content) + + +async def _check_and_add_file( + session: DbSession, + chain_service: ChainService, + storage_service: StorageService, + message: Optional[PendingStoreMessage], + file: UploadedFile, +) -> str: + # Perform authentication and balance checks + if message: + await _verify_message_signature( + pending_message=message, chain_service=chain_service + ) + try: + message_content = StoreContent.parse_raw(message.item_content) + except ValidationError as e: + raise web.HTTPUnprocessableEntity( + reason=f"Invalid store message content: {e.json()}" + ) + + await _verify_user_balance( + session=session, + address=message_content.address, + size=file.size, + ) + + else: + message_content = None + + # TODO: this can still reach 1 GiB in memory. We should look into streaming. + file_content = file.content + file_bytes = ( + file_content.encode("utf-8") if isinstance(file_content, str) else file_content + ) + file_hash = get_sha256(file_content) + + if message_content: + if message_content.item_hash != file_hash: + raise web.HTTPUnprocessableEntity( + reason=f"File hash does not match ({file_hash} != {message_content.item_hash})" + ) + + await storage_service.add_file_content_to_local_storage( + session=session, + file_content=file_bytes, + file_hash=file_hash, + ) + + return file_hash + + +async def _make_mq_queue( + request: web.Request, + sync: bool, + routing_key: Optional[str] = None, +) -> Optional[aio_pika.abc.AbstractQueue]: + if not sync: + return None + + mq_channel = await get_mq_channel_from_request(request, logger) + config = get_config_from_request(request) + return await mq_make_aleph_message_topic_queue( + channel=mq_channel, config=config, routing_key=routing_key + ) + + async def storage_add_file(request: web.Request): storage_service = get_storage_service_from_request(request) session_factory = get_session_factory_from_request(request) + chain_service: ChainService = get_chain_service_from_request(request) - # No need to pin it here anymore. - # TODO: find a way to specify linked ipfs hashes in posts/aggr. post = await request.post() - file_io = multidict_proxy_to_io(post) + try: + file_field = post["file"] + except KeyError: + raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.") + + if isinstance(file_field, FileField): + uploaded_file: UploadedFile = MultipartUploadedFile(file_field) + else: + uploaded_file = RawUploadedFile(file_field) + + metadata = post.get("metadata") + + status_code = 200 + + if metadata: + metadata_bytes = ( + metadata.file.read() if isinstance(metadata, FileField) else metadata + ) + try: + storage_metadata = StorageMetadata.parse_raw(metadata_bytes) + except ValidationError as e: + raise web.HTTPUnprocessableEntity( + reason=f"Could not decode metadata: {e.json()}" + ) + + message = storage_metadata.message + sync = storage_metadata.sync + max_upload_size = MAX_UPLOAD_FILE_SIZE + + else: + # User did not provide a message in the `metadata` field + message = None + sync = False + max_upload_size = MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE + + if uploaded_file.size > max_upload_size: + raise web.HTTPRequestEntityTooLarge( + actual_size=uploaded_file.size, max_size=MAX_UPLOAD_FILE_SIZE + ) + with session_factory() as session: - file_hash = await storage_service.add_file( - session=session, fileobject=file_io, engine=ItemType.storage + file_hash = await _check_and_add_file( + session=session, + chain_service=chain_service, + storage_service=storage_service, + message=message, + file=uploaded_file, ) session.commit() + if message: + broadcast_status = await broadcast_and_process_message( + pending_message=message, sync=sync, request=request, logger=logger + ) + status_code = broadcast_status_to_http_status(broadcast_status) + output = {"status": "success", "hash": file_hash} - return web.json_response(output) + return web.json_response(data=output, status=status_code) def assert_file_is_downloadable(session: DbSession, file_hash: str) -> None: diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index b8267cb3b..bd69cb301 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -1,24 +1,59 @@ +import asyncio import json +import logging from io import BytesIO, StringIO from math import ceil -from typing import Optional, Union, IO +from typing import Optional, Any, Mapping, List, Union, Dict +from typing import overload import aio_pika +import aio_pika.abc import aiohttp_jinja2 from aiohttp import web from aiohttp.web_request import FileField +from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config -from multidict import MultiDictProxy +from pydantic import BaseModel + +import aleph.toolkit.json as aleph_json +from aleph.schemas.pending_messages import parse_message, BasePendingMessage +from aleph.services.ipfs import IpfsService +from aleph.services.p2p.pubsub import publish as pub_p2p +from aleph.toolkit.shield import shielded +from aleph.types.message_status import ( + InvalidMessageException, + MessageStatus, + MessageProcessingStatus, +) +from aleph.types.protocol import Protocol +from aleph.web.controllers.app_state_getters import ( + get_config_from_request, + get_ipfs_service_from_request, + get_p2p_client_from_request, + get_mq_channel_from_request, +) DEFAULT_MESSAGES_PER_PAGE = 20 DEFAULT_PAGE = 1 LIST_FIELD_SEPARATOR = "," -def multidict_proxy_to_io( - multi_dict: MultiDictProxy[Union[str, bytes, FileField]] -) -> IO: - file_field = multi_dict["file"] +@overload +def file_field_to_io(multi_dict: bytes) -> BytesIO: + ... + + +@overload +def file_field_to_io(multi_dict: str) -> StringIO: + ... + + +@overload +def file_field_to_io(multi_dict: FileField) -> BytesIO: + ... + + +def file_field_to_io(file_field): if isinstance(file_field, bytes): return BytesIO(file_field) elif isinstance(file_field, str): @@ -153,10 +188,199 @@ async def mq_make_aleph_message_topic_queue( auto_delete=False, ) mq_queue = await channel.declare_queue( - auto_delete=True, exclusive=True, + auto_delete=True, + exclusive=True, # Auto-delete the queue after 30 seconds. This guarantees that queues are deleted even # if a bug makes the consumer crash before cleanup. - arguments={"x-expires": 30000} + arguments={"x-expires": 30000}, ) await mq_queue.bind(mq_message_exchange, routing_key=routing_key) return mq_queue + + +def processing_status_to_http_status(status: MessageProcessingStatus) -> int: + mapping = { + MessageProcessingStatus.PROCESSED_NEW_MESSAGE: 200, + MessageProcessingStatus.PROCESSED_CONFIRMATION: 200, + MessageProcessingStatus.FAILED_WILL_RETRY: 202, + MessageProcessingStatus.FAILED_REJECTED: 422, + } + return mapping[status] + + +def message_status_to_http_status(status: MessageStatus) -> int: + mapping = { + MessageStatus.PENDING: 202, + MessageStatus.PROCESSED: 200, + MessageStatus.REJECTED: 422, + } + return mapping[status] + + +async def mq_read_one_message( + mq_queue: aio_pika.abc.AbstractQueue, timeout: float +) -> Optional[aio_pika.abc.AbstractIncomingMessage]: + """ + Consume one element from a message queue and then return. + """ + + queue: asyncio.Queue = asyncio.Queue() + + async def _process_message(message: aio_pika.abc.AbstractMessage): + await queue.put(message) + + consumer_tag = await mq_queue.consume(_process_message, no_ack=True) + + try: + return await asyncio.wait_for(queue.get(), timeout) + except asyncio.TimeoutError: + return None + finally: + await mq_queue.cancel(consumer_tag) + + +def validate_message_dict(message_dict: Mapping[str, Any]) -> BasePendingMessage: + try: + return parse_message(message_dict) + except InvalidMessageException as e: + raise web.HTTPUnprocessableEntity(body=str(e)) + + +class PublicationStatus(BaseModel): + status: str + failed: List[Protocol] + + @classmethod + def from_failures(cls, failed_publications: List[Protocol]): + status = { + 0: "success", + 1: "warning", + 2: "error", + }[len(failed_publications)] + return cls(status=status, failed=failed_publications) + + +async def pub_on_p2p_topics( + p2p_client: AlephP2PServiceClient, + ipfs_service: Optional[IpfsService], + topic: str, + payload: Union[str, bytes], + logger: logging.Logger, +) -> List[Protocol]: + + failed_publications = [] + + if ipfs_service: + try: + await asyncio.wait_for(ipfs_service.pub(topic, payload), 10) + except Exception: + logger.exception("Can't publish on ipfs") + failed_publications.append(Protocol.IPFS) + + try: + await asyncio.wait_for( + pub_p2p( + p2p_client, + topic, + payload, + loopback=True, + ), + 10, + ) + except Exception: + logger.exception("Can't publish on p2p") + failed_publications.append(Protocol.P2P) + + return failed_publications + + +class BroadcastStatus(BaseModel): + publication_status: PublicationStatus + message_status: Optional[MessageStatus] + + +def broadcast_status_to_http_status(broadcast_status: BroadcastStatus) -> int: + if broadcast_status.publication_status == "error": + return 500 + + message_status = broadcast_status.message_status + # Message status should always be set if the publication succeeded + # TODO: improve typing to make this check useless + assert message_status is not None + return message_status_to_http_status(message_status) + + +@shielded +async def broadcast_and_process_message( + pending_message: BasePendingMessage, + sync: bool, + request: web.Request, + logger: logging.Logger, + message_dict: Optional[Dict[str, Any]], +) -> BroadcastStatus: + # In sync mode, wait for a message processing event. We need to create the queue + # before publishing the message on P2P topics in order to guarantee that the event + # will be picked up. + config = get_config_from_request(request) + + if sync: + mq_channel = await get_mq_channel_from_request(request=request, logger=logger) + mq_queue = await mq_make_aleph_message_topic_queue( + channel=mq_channel, + config=config, + routing_key=f"*.{pending_message.item_hash}", + ) + else: + mq_queue = None + + # We publish the message on P2P topics early, for 3 reasons: + # 1. Just because this node is unable to process the message does not + # necessarily mean the message is incorrect (ex: bug in a new version). + # 2. If the publication fails after the processing, we end up in a situation where + # a message exists without being propagated to the other nodes, ultimately + # causing sync issues on the network. + # 3. The message is currently fed to this node using the P2P service client + # loopback mechanism. + ipfs_service = get_ipfs_service_from_request(request) + p2p_client = get_p2p_client_from_request(request) + + message_topic = config.aleph.queue_topic.value + message_dict = message_dict or pending_message.dict(exclude_none=True) + + failed_publications = await pub_on_p2p_topics( + p2p_client=p2p_client, + ipfs_service=ipfs_service, + topic=message_topic, + payload=aleph_json.dumps(message_dict), + logger=logger, + ) + pub_status = PublicationStatus.from_failures(failed_publications) + if pub_status.status == "error": + return BroadcastStatus(publication_status=pub_status, message_status=None) + + status = BroadcastStatus( + publication_status=pub_status, message_status=MessageStatus.PENDING + ) + + # When publishing in async mode, just respond with 202 (Accepted). + if not sync: + return status + + # Ignore type checking here, we know that mq_queue is set at this point + assert mq_queue is not None + response = await mq_read_one_message(mq_queue, timeout=30) + + # Delete the queue immediately + await mq_queue.delete(if_empty=False) + + # If the message was not processed before the timeout, return a 202. + if response is None: + return status + + routing_key = response.routing_key + assert routing_key is not None # again, for type checking + status_str, _item_hash = routing_key.split(".") + processing_status = MessageProcessingStatus(status_str) + + status.message_status = processing_status.to_message_status() + return status diff --git a/tests/api/test_p2p.py b/tests/api/test_p2p.py index ee352319d..93e97df3b 100644 --- a/tests/api/test_p2p.py +++ b/tests/api/test_p2p.py @@ -69,16 +69,16 @@ async def test_pubsub_pub_errors(ccn_api_client, mock_config: Config): @pytest.mark.asyncio async def test_post_message_sync(ccn_api_client, mocker): # Mock the functions used to create the RabbitMQ queue - mocker.patch("aleph.web.controllers.p2p.get_mq_channel_from_request") + mocker.patch("aleph.web.controllers.utils.get_mq_channel_from_request") mocked_queue = mocker.patch( - "aleph.web.controllers.p2p.mq_make_aleph_message_topic_queue" + "aleph.web.controllers.utils.mq_make_aleph_message_topic_queue" ) # Create a mock MQ response object mock_mq_message = mocker.Mock() mock_mq_message.routing_key = f"processed.{MESSAGE_DICT['item_hash']}" mocker.patch( - "aleph.web.controllers.p2p._mq_read_one_message", return_value=mock_mq_message + "aleph.web.controllers.utils.mq_read_one_message", return_value=mock_mq_message ) response = await ccn_api_client.post( diff --git a/tests/api/test_storage.py b/tests/api/test_storage.py index 0965186fb..9be988f52 100644 --- a/tests/api/test_storage.py +++ b/tests/api/test_storage.py @@ -1,15 +1,22 @@ import base64 -from typing import Any +import json +from decimal import Decimal +from typing import Any, Optional import aiohttp import orjson import pytest -from aleph_message.models import ItemHash +import requests +from aleph_message.models import ItemHash, Chain +from aleph.chains.chain_service import ChainService from aleph.db.accessors.files import get_file +from aleph.db.models import AlephBalanceDb from aleph.storage import StorageService from aleph.types.db_session import DbSessionFactory from aleph.types.files import FileType +from aleph.types.message_status import MessageStatus +from aleph.web.controllers.utils import BroadcastStatus, PublicationStatus from in_memory_storage_engine import InMemoryStorageEngine IPFS_ADD_FILE_URI = "/api/v0/ipfs/add_file" @@ -31,6 +38,25 @@ "b7c7b2db0bcec890b8c859b2b76e7c998de15e31ccc945bc7425c4bdc091a0b2" ) +MESSAGE_DICT = { + "chain": "ETH", + "sender": "0x6dA130FD646f826C1b8080C07448923DF9a79aaA", + "type": "STORE", + "channel": "null", + "signature": "0x2b90dcfa8f93506150df275a4fe670e826be0b4b751badd6ec323648a6a738962f47274f71a9939653fb6d49c25055821f547447fb3b33984a579008d93eca431b", + "time": 1692193373.7144432, + "item_type": "inline", + "item_content": '{"address":"0x6dA130FD646f826C1b8080C07448923DF9a79aaA","time":1692193373.714271,"item_type":"storage","item_hash":"0214e5578f5acb5d36ea62255cbf1157a4bdde7b9612b5db4899b2175e310b6f","mime_type":"text/plain"}', + "item_hash": "8227acbc2f7c43899efd9f63ea9d8119a4cb142f3ba2db5fe499ccfab86dfaed", + "content": { + "address": "0x6dA130FD646f826C1b8080C07448923DF9a79aaA", + "time": 1692193373.714271, + "item_type": "storage", + "item_hash": "0214e5578f5acb5d36ea62255cbf1157a4bdde7b9612b5db4899b2175e310b6f", + "mime_type": "text/plain", + }, +} + @pytest.fixture def api_client(ccn_api_client, mocker): @@ -59,6 +85,12 @@ def api_client(ccn_api_client, mocker): ipfs_service=ipfs_service, node_cache=mocker.AsyncMock(), ) + + ccn_api_client.app["chain_service"] = ChainService( + session_factory=ccn_api_client.app["session_factory"], + storage_service=ccn_api_client.app["storage_service"], + ) + return ccn_api_client @@ -73,7 +105,8 @@ async def add_file( form_data.add_field("file", file_content) post_response = await api_client.post(uri, data=form_data) - assert post_response.status == 200, await post_response.text() + response_text = await post_response.text() + assert post_response.status == 200, await response_text post_response_json = await post_response.json() assert post_response_json["status"] == "success" file_hash = post_response_json["hash"] @@ -81,7 +114,7 @@ async def add_file( # Assert that the file is downloadable get_file_response = await api_client.get(f"{GET_STORAGE_RAW_URI}/{file_hash}") - assert get_file_response.status == 200 + assert get_file_response.status == 200, await get_file_response.text() response_data = await get_file_response.read() # Check that the file appears in the DB @@ -95,6 +128,90 @@ async def add_file( assert response_data == file_content +async def add_file_with_message( + api_client, + session_factory: DbSessionFactory, + uri: str, + file_content: bytes, + error_code: int, + balance: int, + mocker, +): + mocker.patch( + "aleph.web.controllers.storage.broadcast_and_process_message", + return_value=BroadcastStatus( + publication_status=PublicationStatus.from_failures([]), + message_status=MessageStatus.PROCESSED, + ), + ) + + with session_factory() as session: + session.add( + AlephBalanceDb( + address="0x6dA130FD646f826C1b8080C07448923DF9a79aaA", + chain=Chain.ETH, + balance=Decimal(balance), + eth_height=0, + ) + ) + session.commit() + + form_data = aiohttp.FormData() + + form_data.add_field("file", file_content) + data = { + "message": MESSAGE_DICT, + "sync": True, + } + form_data.add_field("metadata", json.dumps(data), content_type="application/json") + + response = await api_client.post(uri, data=form_data) + response_text = await response.text() + assert response.status == error_code, response_text + + +async def add_file_with_message_202( + api_client, + session_factory: DbSessionFactory, + uri: str, + file_content: bytes, + size: str, + error_code: int, + balance: int, + mocker, +): + mocker.patch( + "aleph.web.controllers.storage.broadcast_and_process_message", + return_value=BroadcastStatus( + publication_status=PublicationStatus.from_failures([]), + message_status=MessageStatus.PENDING, + ), + ) + with session_factory() as session: + session.add( + AlephBalanceDb( + address="0x6dA130FD646f826C1b8080C07448923DF9a79aaA", + chain=Chain.ETH, + balance=Decimal(balance), + eth_height=0, + ) + ) + session.commit() + + form_data = aiohttp.FormData() + + form_data.add_field("file", file_content) + + data = { + "message": MESSAGE_DICT, + "file_size": int(size), + "sync": True, + } + form_data.add_field("metadata", json.dumps(data), content_type="application/json") + response = await api_client.post(uri, data=form_data) + assert response.status == error_code, await response.text() + + @pytest.mark.asyncio async def test_storage_add_file(api_client, session_factory: DbSessionFactory): await add_file( @@ -106,6 +223,82 @@ async def test_storage_add_file(api_client, session_factory: DbSessionFactory): ) +@pytest.mark.parametrize( + "file_content, expected_hash, size, error_code, balance", + [ + ( + b"Hello Aleph.im\n", + "0214e5578f5acb5d36ea62255cbf1157a4bdde7b9612b5db4899b2175e310b6f", + None, + "200", + "0", + ), + ( + b"Hello Aleph.im\n", + "0214e5578f5acb5d36ea62255cbf1157a4bdde7b9612b5db4899b2175e310b6f", + None, + "200", + "1000", + ), + ], +) +@pytest.mark.asyncio +async def test_storage_add_file_with_message( + api_client, + session_factory: DbSessionFactory, + file_content, + expected_hash, + size: Optional[int], + error_code, + balance, + mocker, +): + await add_file_with_message( + api_client, + session_factory, + uri=STORAGE_ADD_FILE_URI, + file_content=file_content, + error_code=int(error_code), + balance=int(balance), + mocker=mocker, + ) + + +@pytest.mark.parametrize( + "file_content, expected_hash, size, error_code, balance", + [ + ( + b"Hello Aleph.im\n", + "0214e5578f5acb5d36ea62255cbf1157a4bdde7b9612b5db4899b2175e310b6f", + "15", + "202", + "1000", + ), + ], +) +@pytest.mark.asyncio +async def test_storage_add_file_with_message_202( + api_client, + session_factory: DbSessionFactory, + file_content, + expected_hash, + size, + error_code, + balance, + mocker, +): + await add_file_with_message_202( + api_client, + session_factory, + uri=STORAGE_ADD_FILE_URI, + file_content=file_content, + size=size, + error_code=int(error_code), + balance=int(balance), + mocker=mocker, + ) + + @pytest.mark.asyncio async def test_ipfs_add_file(api_client, session_factory: DbSessionFactory): await add_file(