Skip to content

Feature: authenticated file upload #463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e0d201a
Features: Secure Endpoint
1yam Aug 21, 2023
d96ffa4
Features: Secure upload endpoint
1yam Aug 21, 2023
cccb53f
Fix : Clear test
1yam Aug 21, 2023
3873b96
Internal: add test for /add_file (200, 422, 402 tested)
1yam Aug 22, 2023
7367509
Fix: Mypy error
1yam Aug 22, 2023
35a2c16
Refactor: Use MQ queue to trace status of the message
1yam Aug 22, 2023
ce83b88
Fix: Test
1yam Aug 22, 2023
41e1a86
Fix: the message was not commit to the db
1yam Aug 22, 2023
23ecf40
Fix : Remove unused function
1yam Aug 22, 2023
b45ecaa
Refactor: Move private functions to public
1yam Aug 23, 2023
fdb8af9
Fix: use ChainService instead of re create verify_signature
1yam Aug 24, 2023
1d1efb4
Fix: missed to push it
1yam Aug 24, 2023
55e2283
Fix: Use MiB instead of MB
1yam Aug 24, 2023
45809a1
Fix
1yam Aug 24, 2023
455a4b8
Refactor: storage_add
1yam Aug 25, 2023
11451e1
Fix: Miss one conditions
1yam Aug 25, 2023
85745cc
Fix: mypy error
1yam Aug 25, 2023
fa37f20
some Refactor
1yam Aug 25, 2023
79bd1f8
Fix: balances contrôle check all cost for user
1yam Aug 25, 2023
76256d0
Fix: mypy error
1yam Aug 25, 2023
553ee31
Update src/aleph/web/controllers/storage.py
1yam Aug 28, 2023
68e4dba
Fix: add_file functions who did not seek file object
1yam Aug 28, 2023
ab95c92
Fix: conditions
1yam Aug 28, 2023
adc24d4
Fix: Last bug & black
1yam Aug 28, 2023
fa2c19e
Fix: Contrôle balance only if file > 25 MiB
1yam Aug 28, 2023
ae73641
Fix: conditions for balance control
1yam Aug 28, 2023
fd3e64a
Fix: File < 25 MB so return code will be 200
1yam Aug 28, 2023
d54959b
It's only a test
1yam Aug 28, 2023
b8d9b17
Revert "It's only a test"
1yam Aug 28, 2023
a1dc0ff
Refactor: more opti
1yam Sep 5, 2023
c7ea999
pair programming review
odesenfans Sep 6, 2023
3d8a782
mypy fix 1
odesenfans Sep 6, 2023
a6335b4
fix mypy
1yam Sep 7, 2023
127c4f0
upload works again
odesenfans Sep 8, 2023
fd27890
nearly done, need mypy fixes + decide what to do with size test
odesenfans Sep 8, 2023
ad2283f
fix mypy and remove size test
odesenfans Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/aleph/api_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from configmanager import Config

import aleph.config
from aleph.chains.chain_service import ChainService
from aleph.db.connection import make_engine, make_session_factory
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
Expand All @@ -21,7 +22,10 @@
APP_STATE_NODE_CACHE,
APP_STATE_P2P_CLIENT,
APP_STATE_SESSION_FACTORY,
APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL, APP_STATE_MQ_WS_CHANNEL,
APP_STATE_STORAGE_SERVICE,
APP_STATE_MQ_CHANNEL,
APP_STATE_MQ_WS_CHANNEL,
APP_STATE_CHAIN_SERVICE,
)


Expand Down Expand Up @@ -49,6 +53,9 @@ async def configure_aiohttp_app(
ipfs_service=ipfs_service,
node_cache=node_cache,
)
chain_service = ChainService(
storage_service=storage_service, session_factory=session_factory
)

app = create_aiohttp_app()

Expand All @@ -67,6 +74,7 @@ async def configure_aiohttp_app(
app[APP_STATE_NODE_CACHE] = node_cache
app[APP_STATE_STORAGE_SERVICE] = storage_service
app[APP_STATE_SESSION_FACTORY] = session_factory
app[APP_STATE_CHAIN_SERVICE] = chain_service

return app

Expand Down
5 changes: 5 additions & 0 deletions src/aleph/schemas/pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ class PendingStoreMessage(BasePendingMessage[Literal[MessageType.store], StoreCo
pass


class PendingInlineStoreMessage(PendingStoreMessage):
item_content: str
item_type: Literal[ItemType.inline] # type: ignore[valid-type]


MESSAGE_TYPE_TO_CLASS = {
MessageType.aggregate: PendingAggregateMessage,
MessageType.forget: PendingForgetMessage,
Expand Down
28 changes: 21 additions & 7 deletions src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from hashlib import sha256
from typing import Any, IO, Optional, cast, Final
from aiohttp import web

from aleph_message.models import ItemType

Expand All @@ -19,9 +20,13 @@
from aleph.services.ipfs.common import get_cid_version
from aleph.services.p2p.http import request_hash as p2p_http_request_hash
from aleph.services.storage.engine import StorageEngine
from aleph.toolkit.constants import MiB
from aleph.types.db_session import DbSession
from aleph.types.files import FileType
from aleph.utils import get_sha256
from aleph.schemas.pending_messages import (
parse_message,
)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -239,7 +244,9 @@ async def get_json(
async def pin_hash(self, chash: str, timeout: int = 30, tries: int = 1):
await self.ipfs_service.pin_add(cid=chash, timeout=timeout, tries=tries)

async def add_json(self, session: DbSession, value: Any, engine: ItemType = ItemType.ipfs) -> str:
async def add_json(
self, session: DbSession, value: Any, engine: ItemType = ItemType.ipfs
) -> str:
content = aleph_json.dumps(value)

if engine == ItemType.ipfs:
Expand All @@ -259,6 +266,17 @@ async def add_json(self, session: DbSession, value: Any, engine: ItemType = Item

return chash

async def add_file_content_to_local_storage(
self, session: DbSession, file_content: bytes, file_hash: str
) -> None:
await self.storage_engine.write(filename=file_hash, content=file_content)
upsert_file(
session=session,
file_hash=file_hash,
size=len(file_content),
file_type=FileType.FILE,
)

async def add_file(
self, session: DbSession, fileobject: IO, engine: ItemType = ItemType.ipfs
) -> str:
Expand All @@ -275,12 +293,8 @@ async def add_file(
else:
raise ValueError(f"Unsupported item type: {engine}")

await self.storage_engine.write(filename=file_hash, content=file_content)
upsert_file(
session=session,
file_hash=file_hash,
size=len(file_content),
file_type=FileType.FILE,
await self.add_file_content_to_local_storage(
session=session, file_content=file_content, file_hash=file_hash
)

return file_hash
7 changes: 6 additions & 1 deletion src/aleph/web/controllers/app_state_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from aleph_p2p_client import AlephP2PServiceClient
from configmanager import Config

from aleph.chains.chain_service import ChainService
from aleph.services.cache.node_cache import NodeCache
from aleph.services.ipfs import IpfsService
from aleph.storage import StorageService
Expand All @@ -27,7 +28,7 @@
APP_STATE_P2P_CLIENT = "p2p_client"
APP_STATE_SESSION_FACTORY = "session_factory"
APP_STATE_STORAGE_SERVICE = "storage_service"

APP_STATE_CHAIN_SERVICE = "chain_service"

T = TypeVar("T")

Expand Down Expand Up @@ -103,3 +104,7 @@ def get_session_factory_from_request(request: web.Request) -> DbSessionFactory:

def get_storage_service_from_request(request: web.Request) -> StorageService:
return cast(StorageService, request.app[APP_STATE_STORAGE_SERVICE])


def get_chain_service_from_request(request: web.Request) -> ChainService:
return cast(ChainService, request.app[APP_STATE_CHAIN_SERVICE])
9 changes: 7 additions & 2 deletions src/aleph/web/controllers/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_ipfs_service_from_request,
get_session_factory_from_request,
)
from aleph.web.controllers.utils import multidict_proxy_to_io
from aleph.web.controllers.utils import file_field_to_io


async def ipfs_add_file(request: web.Request):
Expand All @@ -20,7 +20,12 @@ async def ipfs_add_file(request: web.Request):

# No need to pin it here anymore.
post = await request.post()
ipfs_add_response = await ipfs_service.add_file(multidict_proxy_to_io(post))
try:
file_field = post["file"]
except KeyError:
raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.")

ipfs_add_response = await ipfs_service.add_file(file_field_to_io(file_field))

cid = ipfs_add_response["Hash"]
name = ipfs_add_response["Name"]
Expand Down
158 changes: 17 additions & 141 deletions src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,32 @@
import asyncio
import json
import logging
from typing import Dict, cast, Optional, Any, Mapping, List, Union
from typing import Dict, cast, Optional, Any, List, Union

import aio_pika.abc
from aiohttp import web
from aleph_p2p_client import AlephP2PServiceClient
from configmanager import Config
from pydantic import BaseModel, Field, ValidationError

import aleph.toolkit.json as aleph_json
from aleph.schemas.pending_messages import parse_message, BasePendingMessage
from aleph.services.ipfs import IpfsService
from aleph.services.p2p.pubsub import publish as pub_p2p
from aleph.toolkit.shield import shielded
from aleph.types.message_status import (
InvalidMessageException,
MessageStatus,
MessageProcessingStatus,
)
from aleph.types.protocol import Protocol
from aleph.web.controllers.app_state_getters import (
get_config_from_request,
get_ipfs_service_from_request,
get_p2p_client_from_request,
get_mq_channel_from_request,
)
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue
from aleph.web.controllers.utils import (
validate_message_dict,
broadcast_and_process_message,
PublicationStatus,
broadcast_status_to_http_status,
)

LOGGER = logging.getLogger(__name__)


class PublicationStatus(BaseModel):
status: str
failed: List[Protocol]

@classmethod
def from_failures(cls, failed_publications: List[Protocol]):
status = {
0: "success",
1: "warning",
2: "error",
}[len(failed_publications)]
return cls(status=status, failed=failed_publications)


def _validate_message_dict(message_dict: Mapping[str, Any]) -> BasePendingMessage:
try:
return parse_message(message_dict)
except InvalidMessageException as e:
raise web.HTTPUnprocessableEntity(body=str(e))


def _validate_request_data(config: Config, request_data: Dict) -> None:
"""
Validates the content of a JSON pubsub message depending on the channel
Expand Down Expand Up @@ -83,7 +58,7 @@ def _validate_request_data(config: Config, request_data: Dict) -> None:
reason="'data': must be deserializable as JSON."
)

_validate_message_dict(message_dict)
validate_message_dict(message_dict)


async def _pub_on_p2p_topics(
Expand Down Expand Up @@ -142,48 +117,11 @@ async def pub_json(request: web.Request):
)


async def _mq_read_one_message(
mq_queue: aio_pika.abc.AbstractQueue, timeout: float
) -> Optional[aio_pika.abc.AbstractIncomingMessage]:
"""
Consume one element from a message queue and then return.
"""

queue: asyncio.Queue = asyncio.Queue()

async def _process_message(message: aio_pika.abc.AbstractMessage):
await queue.put(message)

consumer_tag = await mq_queue.consume(_process_message, no_ack=True)

try:
return await asyncio.wait_for(queue.get(), timeout)
except asyncio.TimeoutError:
return None
finally:
await mq_queue.cancel(consumer_tag)


def _processing_status_to_http_status(status: MessageProcessingStatus) -> int:
mapping = {
MessageProcessingStatus.PROCESSED_NEW_MESSAGE: 200,
MessageProcessingStatus.PROCESSED_CONFIRMATION: 200,
MessageProcessingStatus.FAILED_WILL_RETRY: 202,
MessageProcessingStatus.FAILED_REJECTED: 422,
}
return mapping[status]


class PubMessageRequest(BaseModel):
sync: bool = False
message_dict: Dict[str, Any] = Field(alias="message")


class PubMessageResponse(BaseModel):
publication_status: PublicationStatus
message_status: Optional[MessageStatus]


@shielded
async def pub_message(request: web.Request):
try:
Expand All @@ -194,76 +132,14 @@ async def pub_message(request: web.Request):
# Body must be valid JSON
raise web.HTTPUnprocessableEntity()

pending_message = _validate_message_dict(request_data.message_dict)

# In sync mode, wait for a message processing event. We need to create the queue
# before publishing the message on P2P topics in order to guarantee that the event
# will be picked up.
config = get_config_from_request(request)

if request_data.sync:
mq_channel = await get_mq_channel_from_request(request=request, logger=LOGGER)
mq_queue = await mq_make_aleph_message_topic_queue(
channel=mq_channel,
config=config,
routing_key=f"*.{pending_message.item_hash}",
)
else:
mq_queue = None

# We publish the message on P2P topics early, for 3 reasons:
# 1. Just because this node is unable to process the message does not
# necessarily mean the message is incorrect (ex: bug in a new version).
# 2. If the publication fails after the processing, we end up in a situation where
# a message exists without being propagated to the other nodes, ultimately
# causing sync issues on the network.
# 3. The message is currently fed to this node using the P2P service client
# loopback mechanism.
ipfs_service = get_ipfs_service_from_request(request)
p2p_client = get_p2p_client_from_request(request)

message_topic = config.aleph.queue_topic.value
failed_publications = await _pub_on_p2p_topics(
p2p_client=p2p_client,
ipfs_service=ipfs_service,
topic=message_topic,
payload=aleph_json.dumps(request_data.message_dict),
pending_message = validate_message_dict(request_data.message_dict)
broadcast_status = await broadcast_and_process_message(
pending_message=pending_message,
message_dict=request_data.message_dict,
sync=request_data.sync,
request=request,
logger=LOGGER,
)
pub_status = PublicationStatus.from_failures(failed_publications)
if pub_status.status == "error":
return web.json_response(
text=PubMessageResponse(
publication_status=pub_status, message_status=None
).json(),
status=500,
)

status = PubMessageResponse(
publication_status=pub_status, message_status=MessageStatus.PENDING
)

# When publishing in async mode, just respond with 202 (Accepted).
message_accepted_response = web.json_response(text=status.json(), status=202)
if not request_data.sync:
return message_accepted_response

# Ignore type checking here, we know that mq_queue is set at this point
assert mq_queue is not None
response = await _mq_read_one_message(mq_queue, timeout=30)

# Delete the queue immediately
await mq_queue.delete(if_empty=False)

# If the message was not processed before the timeout, return a 202.
if response is None:
return message_accepted_response

routing_key = response.routing_key
assert routing_key is not None # again, for type checking
status_str, _item_hash = routing_key.split(".")
processing_status = MessageProcessingStatus(status_str)
status_code = _processing_status_to_http_status(processing_status)

status.message_status = processing_status.to_message_status()

return web.json_response(text=status.json(), status=status_code)
status_code = broadcast_status_to_http_status(broadcast_status)
return web.json_response(text=broadcast_status.json(), status=status_code)
Loading