diff --git a/deployment/docker-build/pyaleph.dockerfile b/deployment/docker-build/pyaleph.dockerfile index d743a2fa4..7f9ebfed1 100644 --- a/deployment/docker-build/pyaleph.dockerfile +++ b/deployment/docker-build/pyaleph.dockerfile @@ -67,7 +67,6 @@ RUN /opt/venv/bin/pip install --no-cache-dir -r /opt/build/requirements.txt RUN rm /opt/build/requirements.txt # === Install the CCN itself === -COPY deployment/migrations /opt/pyaleph/migrations COPY setup.py /opt/pyaleph/ COPY src /opt/pyaleph/src # Git data is used to determine the version of the CCN diff --git a/deployment/migrations/config_updater.py b/deployment/migrations/config_updater.py deleted file mode 100755 index 0608ac85b..000000000 --- a/deployment/migrations/config_updater.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python3 -""" -Checks and updates the configuration and key files of an Aleph node. - -This script imports the migration scripts in the scripts/ directory one by one -and runs the `upgrade` function they must all implement. Alternatively, -it can revert operations using the `downgrade` function, if implemented. - -Migration scripts must implement a check to verify if they need to run -as the tool itself has no way to determine which scripts have already -been executed in the past. -""" - -import argparse -import asyncio -import importlib.util -import logging -import sys -from pathlib import Path -from types import ModuleType -from typing import Iterable, Optional - -from configmanager import Config - -from aleph.config import get_defaults -from aleph.model import init_db_globals - -LOGGER = logging.getLogger() - -SERIALIZED_KEY_FILE = "serialized-node-secret.key" - - -def cli_parse() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Checks and updates the configuration and key files of an Aleph node." - ) - parser.add_argument( - "command", - action="store", - choices=("upgrade", "downgrade"), - type=str, - help="Path to setup.cfg.", - ) - parser.add_argument( - "--config", - action="store", - required=True, - type=str, - help="Path to the Core Channel Node configuration file.", - ) - parser.add_argument( - "--key-dir", - action="store", - required=True, - type=str, - help="Path to the key directory.", - ) - parser.add_argument( - "--key-file", - action="store", - required=False, - type=str, - help="Path to the private key file, if any. Only used to upgrade the key to the latest format.", - ) - parser.add_argument( - "--filter-scripts", - action="store", - required=False, - type=str, - help="A filter for migration scripts. If specified, only the files " - "matching the provided glob expression will be run.", - ) - parser.add_argument( - "--verbose", - "-v", - help="Show more information.", - action="store_true", - default=False, - ) - return parser.parse_args() - - -def setup_logging(log_level: int) -> None: - logformat = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - logging.basicConfig( - level=log_level, - stream=sys.stdout, - format=logformat, - datefmt="%Y-%m-%d %H:%M:%S", - ) - - -def init_config(config_file: str) -> Config: - config = Config(schema=get_defaults()) - config.yaml.load(config_file) - return config - - -def import_module_from_path(path: str) -> ModuleType: - spec = importlib.util.spec_from_file_location("migration_module", path) - migration_module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(migration_module) - return migration_module - - -def list_migration_scripts( - migrations_dir: Path, glob_expression: Optional[str] -) -> Iterable[Path]: - migration_scripts = set(migrations_dir.glob("*.py")) - if glob_expression: - migration_scripts = migration_scripts & set( - migrations_dir.glob(glob_expression) - ) - - return migration_scripts - - -async def main(args: argparse.Namespace): - log_level = logging.DEBUG if args.verbose else logging.INFO - setup_logging(log_level) - - # Initialize some basic config and global variables - config = init_config(args.config) - init_db_globals(config=config) - - migration_scripts_dir = Path(__file__).parent / "scripts" - migration_scripts = sorted( - list_migration_scripts(migration_scripts_dir, args.filter_scripts) - ) - - command = args.command - - for migration_script in migration_scripts: - migration_script_path = migration_scripts_dir / migration_script - migration_module = import_module_from_path(str(migration_script_path)) - - if args.verbose: - LOGGER.info(f"%s: %s", migration_script, migration_module.__doc__) - LOGGER.info(f"Running %s for %s...", command, migration_script) - migration_func = getattr(migration_module, args.command) - - kwargs = { - "config_file": args.config, - "key_dir": args.key_dir, - "key_file": args.key_file, - "config": config, - } - - if asyncio.iscoroutinefunction(migration_func): - await migration_func(**kwargs) - else: - migration_func(**kwargs) - - LOGGER.info( - f"Successfully ran %s. You can now start the Core Channel Node.", command - ) - - -if __name__ == "__main__": - try: - asyncio.run(main(cli_parse())) - except Exception as e: - LOGGER.error("%s", str(e)) - sys.exit(1) diff --git a/setup.cfg b/setup.cfg index 47a3d2519..9f7d8bbac 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,7 +29,9 @@ package_dir = setup_requires = pyscaffold>=3.1a0,<3.2a0 pytest-runner>=2.0,<3dev -# Add here dependencies of your project (semicolon/line-separated), e.g. + +# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts. +# Update with care. Dependencies that were added to make it all work are annotated accordingly. install_requires = aiocache==0.11.1 aiohttp-cors==0.7.0 @@ -38,13 +40,15 @@ install_requires = aiohttp==3.8.1 aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7 aleph-client==0.4.6 - aleph-message==0.2.1 + aleph-message==0.2.2 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c coincurve==15.0.1 configmanager==1.35.1 configparser==5.2.0 cosmospy==6.0.0 dataclasses_json==0.5.6 + eciespy==0.3.11 # eth dependency + eth-hash==0.3.3 # eth dependency eth-keys==0.3.3 eth-rlp==0.2.1 eth_account==0.5.6 @@ -53,17 +57,19 @@ install_requires = msgpack==1.0.3 # required by aiocache nuls2-python@git+https://github.com/aleph-im/nuls2-python.git p2pclient==0.2.0 + protobuf==3.20.3 # eth dependency pymongo==3.12.3 pynacl==1.5.0 python-dateutil==2.8.2 python-socketio==5.5.1 pytz==2021.3 pyyaml==6.0 - requests==2.27.1 + requests==2.28.1 secp256k1==0.14.0 sentry-sdk==1.5.11 setproctitle==1.2.2 substrate-interface==1.1.7 + typer==0.4.1 ujson==5.1.0 # required by aiocache urllib3==1.26.8 uvloop==0.16.0 @@ -96,6 +102,8 @@ testing = pytest-aiohttp pytest-asyncio pytest-mock + types-pytz + types-pyyaml types-requests types-setuptools nuls2 = @@ -111,6 +119,7 @@ docs = # Add here console scripts like: console_scripts = pyaleph = aleph.commands:run + ccn_cli = aleph.ccn_cli.main:app # For example: # console_scripts = # fibonacci = pyaleph.skeleton:run diff --git a/deployment/migrations/__init__.py b/src/aleph/ccn_cli/__init__.py similarity index 100% rename from deployment/migrations/__init__.py rename to src/aleph/ccn_cli/__init__.py diff --git a/src/aleph/ccn_cli/cli_config.py b/src/aleph/ccn_cli/cli_config.py new file mode 100644 index 000000000..96f49fd29 --- /dev/null +++ b/src/aleph/ccn_cli/cli_config.py @@ -0,0 +1,14 @@ +""" +Global configuration object for the CLI. Use the `get_cli_config()` method +to access and modify the configuration. +""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class CliConfig: + config_file_path: Path + key_dir: Path + verbose: bool diff --git a/src/aleph/ccn_cli/commands/__init__.py b/src/aleph/ccn_cli/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/aleph/ccn_cli/commands/garbage_collector.py b/src/aleph/ccn_cli/commands/garbage_collector.py new file mode 100644 index 000000000..20b05268f --- /dev/null +++ b/src/aleph/ccn_cli/commands/garbage_collector.py @@ -0,0 +1,114 @@ +""" +This migration checks all the files stored in local storage (=GridFS) and compares them to the list +of messages already on the node. The files that are not linked to any message are scheduled for +deletion. +""" +import asyncio +import datetime as dt +from typing import Dict, FrozenSet +from typing import cast + +import pytz +import typer +from configmanager import Config + +import aleph.model +from aleph.ccn_cli.cli_config import CliConfig +from aleph.config import get_defaults +from aleph.model import init_db_globals +from aleph.model.hashes import delete_value as delete_gridfs_file +from .toolkit.local_storage import list_expected_local_files + +gc_ns = typer.Typer() + + +def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None: + typer.echo("The following files will be preserved:") + for file_type, files in files_to_preserve.items(): + typer.echo(f"* {len(files)} {file_type}") + + +async def list_files_to_preserve( + gridfs_files_dict: Dict[str, Dict], + temporary_files_ttl: int, +) -> Dict[str, FrozenSet[str]]: + files_to_preserve_dict = {} + + # Preserve any file that was uploaded less than an hour ago + current_datetime = pytz.utc.localize(dt.datetime.utcnow()) + files_to_preserve_dict["temporary files"] = frozenset( + [ + file["filename"] + for file in gridfs_files_dict.values() + if file["uploadDate"] + > current_datetime - dt.timedelta(seconds=temporary_files_ttl) + ] + ) + + expected_local_files = await list_expected_local_files() + files_to_preserve_dict.update(expected_local_files) + + return files_to_preserve_dict + + +async def run(ctx: typer.Context, dry_run: bool): + config = Config(schema=get_defaults()) + cli_config = cast(CliConfig, ctx.obj) + config.yaml.load(str(cli_config.config_file_path)) + + init_db_globals(config=config) + if aleph.model.db is None: # for mypy + raise ValueError("DB not initialized as expected.") + + # Get a set of all the files currently in GridFS + gridfs_files_dict = { + file["filename"]: file + async for file in aleph.model.db["fs.files"].find( + projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1}, + batch_size=1000, + ) + } + gridfs_files = frozenset(gridfs_files_dict.keys()) + + typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.") + + files_to_preserve_dict = await list_files_to_preserve( + gridfs_files_dict=gridfs_files_dict, + temporary_files_ttl=config.storage.temporary_files_ttl.value, + ) + files_to_preserve = frozenset().union(*files_to_preserve_dict.values()) + files_to_delete = gridfs_files - files_to_preserve + + if cli_config.verbose: + print_files_to_preserve(files_to_preserve_dict) + + restored_memory = sum( + gridfs_files_dict[filename]["length"] for filename in files_to_delete + ) + typer.echo( + f"{len(files_to_delete)} will be deleted, totaling {restored_memory} bytes." + ) + + if dry_run: + if cli_config.verbose: + if files_to_delete: + typer.echo("The following files will be deleted:") + for file_to_delete in files_to_delete: + typer.echo(f"* {file_to_delete}") + + else: + for file_to_delete in files_to_delete: + typer.echo(f"Deleting {file_to_delete}...") + await delete_gridfs_file(file_to_delete) + + typer.echo("Done.") + + +@gc_ns.command(name="run") +def run_gc( + ctx: typer.Context, + dry_run: bool = typer.Option( + False, help="If set, display files to delete without deleting them." + ), +): + asyncio.run(run(ctx, dry_run)) diff --git a/src/aleph/ccn_cli/commands/keys.py b/src/aleph/ccn_cli/commands/keys.py new file mode 100644 index 000000000..c9b8c3413 --- /dev/null +++ b/src/aleph/ccn_cli/commands/keys.py @@ -0,0 +1,46 @@ +import typer + +from aleph.ccn_cli.cli_config import CliConfig +from typing import cast +from aleph.ccn_cli.services.keys import generate_keypair, save_keys + +keys_ns = typer.Typer() + + +@keys_ns.command() +def generate(ctx: typer.Context): + """ + Generates a new set of private/public keys for the Core Channel Node. + The keys will be created in the key directory. You can modify the destination + by using the --key-dir option. + """ + cli_config = cast(CliConfig, ctx.obj) + print(cli_config) + + typer.echo(f"Generating a key pair in {cli_config.key_dir.absolute()}...") + key_pair = generate_keypair() + save_keys(key_pair, str(cli_config.key_dir)) + typer.echo("Done.") + + +@keys_ns.command() +def show(ctx: typer.Context): + """ + Prints the private key of the node. + """ + cli_config = cast(CliConfig, ctx.obj) + + key_path = cli_config.key_dir / "node-secret.key" + if not key_path.exists(): + typer.echo( + f"'{key_path.absolute()}' does not exist. Did you run 'keys generate'?", + err=True, + ) + raise typer.Exit(code=1) + + if not key_path.is_file(): + typer.echo(f"'{key_path}' is not a file.", err=True) + raise typer.Exit(code=1) + + with key_path.open() as f: + typer.echo(f.read()) diff --git a/src/aleph/ccn_cli/commands/migrations/__init__.py b/src/aleph/ccn_cli/commands/migrations/__init__.py new file mode 100644 index 000000000..76975bd92 --- /dev/null +++ b/src/aleph/ccn_cli/commands/migrations/__init__.py @@ -0,0 +1,87 @@ +import asyncio +from pathlib import Path +from traceback import format_exc +from typing import Optional +from typing import cast + +import typer + +from aleph.ccn_cli.cli_config import CliConfig +from .migration_runner import run_migrations + +migrations_ns = typer.Typer() + + +FILTER_SCRIPTS_HELP = ( + "A filter for migration scripts. If specified, only the files " + "matching the provided glob expression will be run." +) + +KEY_FILE_HELP = ( + "Path to the private key file, if any. " + "Only used to upgrade the key to the latest format." +) + + +def run_migration_command( + cli_config: CliConfig, + command: str, + filter_scripts: Optional[str], + key_file: Optional[Path], +): + try: + asyncio.run( + run_migrations( + cli_config=cli_config, + command=command, + filter_scripts=filter_scripts, + key_file=key_file, + ) + ) + except Exception as e: + typer.echo(f"{command} failed: {e}.", err=True) + if cli_config.verbose: + typer.echo(format_exc()) + raise typer.Exit(code=1) + + +@migrations_ns.command() +def upgrade( + ctx: typer.Context, + filter_scripts: Optional[str] = typer.Option( + None, + help=FILTER_SCRIPTS_HELP, + ), + key_file: Optional[Path] = typer.Option( + None, + help=KEY_FILE_HELP, + ), +): + cli_config = cast(CliConfig, ctx.obj) + run_migration_command( + cli_config=cli_config, + command="upgrade", + filter_scripts=filter_scripts, + key_file=key_file, + ) + + +@migrations_ns.command() +def downgrade( + ctx: typer.Context, + filter_scripts: Optional[str] = typer.Option( + None, + help=FILTER_SCRIPTS_HELP, + ), + key_file: Optional[Path] = typer.Option( + None, + help=KEY_FILE_HELP, + ), +): + cli_config = cast(CliConfig, ctx.obj) + run_migration_command( + cli_config=cli_config, + command="downgrade", + filter_scripts=filter_scripts, + key_file=key_file, + ) diff --git a/src/aleph/ccn_cli/commands/migrations/migration_runner.py b/src/aleph/ccn_cli/commands/migrations/migration_runner.py new file mode 100644 index 000000000..51e8719dd --- /dev/null +++ b/src/aleph/ccn_cli/commands/migrations/migration_runner.py @@ -0,0 +1,93 @@ +import asyncio +import importlib.util +import logging +from pathlib import Path +from types import ModuleType +from typing import Iterable, Optional + +import typer +from configmanager import Config + +from aleph.ccn_cli.cli_config import CliConfig +from aleph.config import get_defaults +from aleph.model import init_db_globals + +migrations_ns = typer.Typer() + +LOGGER = logging.getLogger() + +SERIALIZED_KEY_FILE = "serialized-node-secret.key" + + +def init_config(config_file_path: Path) -> Config: + config = Config(schema=get_defaults()) + config.yaml.load(str(config_file_path)) + return config + + +def import_module_from_path(path: str) -> ModuleType: + spec = importlib.util.spec_from_file_location("migration_module", path) + if spec is None: + raise ImportError(f"Could not import migrations from {path}.") + + migration_module = importlib.util.module_from_spec(spec) + if spec.loader is None: + raise ImportError(f"Could not load module from spec for {path}.") + + spec.loader.exec_module(migration_module) + return migration_module + + +def list_migration_scripts( + migrations_dir: Path, glob_expression: Optional[str] +) -> Iterable[Path]: + migration_scripts = set(migrations_dir.glob("*.py")) + if glob_expression: + migration_scripts = migration_scripts & set( + migrations_dir.glob(glob_expression) + ) + + return migration_scripts + + +async def run_migrations( + cli_config: CliConfig, + command: str, + filter_scripts: Optional[str], + key_file: Optional[Path], +): + # Initialize some basic config and global variables + config = init_config(cli_config.config_file_path) + init_db_globals(config=config) + + migration_scripts_dir = Path(__file__).parent / "scripts" + migration_scripts = sorted( + list_migration_scripts(migration_scripts_dir, filter_scripts) + ) + + for migration_script in migration_scripts: + migration_script_path = migration_scripts_dir / migration_script + migration_module = import_module_from_path(str(migration_script_path)) + + typer.echo(f"\n> Running {command} for {migration_script}...") + if cli_config.verbose: + if migration_module.__doc__ is not None: + typer.echo(migration_module.__doc__.lstrip()) + + migration_func = getattr(migration_module, command) + + kwargs = { + "config_file": cli_config.config_file_path, + "key_dir": cli_config.key_dir, + "key_file": key_file, + "config": config, + } + + if asyncio.iscoroutinefunction(migration_func): + await migration_func(**kwargs) + else: + migration_func(**kwargs) + + LOGGER.info( + f"Successfully ran %s. You can now start the Core Channel Node.", command + ) diff --git a/deployment/migrations/scripts/0001-update-keys-for-p2pd.py b/src/aleph/ccn_cli/commands/migrations/scripts/0001-update-keys-for-p2pd.py similarity index 84% rename from deployment/migrations/scripts/0001-update-keys-for-p2pd.py rename to src/aleph/ccn_cli/commands/migrations/scripts/0001-update-keys-for-p2pd.py index 15cb1fb9e..adb9593ba 100644 --- a/deployment/migrations/scripts/0001-update-keys-for-p2pd.py +++ b/src/aleph/ccn_cli/commands/migrations/scripts/0001-update-keys-for-p2pd.py @@ -3,22 +3,16 @@ a public key, and a serialized version of the private key for use by the P2P service. """ +from pathlib import Path +from typing import Optional -import logging -import os - +import typer +import yaml from Crypto.PublicKey import RSA from p2pclient.libp2p_stubs.crypto.rsa import KeyPair, RSAPrivateKey from aleph.exceptions import InvalidKeyDirException from aleph.services.keys import save_keys -from pathlib import Path -from typing import Optional - -import yaml - -LOGGER = logging.getLogger(os.path.basename(__file__)) - SERIALIZED_KEY_FILE = "serialized-node-secret.key" @@ -55,9 +49,8 @@ def upgrade(**kwargs): # Nothing to do if the serialized key file already exists serialized_key_file = key_dir / SERIALIZED_KEY_FILE if serialized_key_file.is_file(): - LOGGER.info( - "Serialized key file {%s} already exists, nothing to do", - serialized_key_file, + typer.echo( + f"Serialized key file {serialized_key_file} already exists, nothing to do" ) return @@ -72,19 +65,18 @@ def upgrade(**kwargs): with open(key_file) as f: private_key = f.read() else: - LOGGER.debug("Key file not specified. Looking in the config file...") + typer.echo("Key file not specified. Looking in the config file...") private_key = get_key_from_config(config_file) if private_key is None: raise ValueError("Key file path not specified and key not provided in config.") - LOGGER.info( - "Migrating the private key in %s and using it to generate a public key " + typer.echo( + f"Migrating the private key in {key_dir} and using it to generate a public key " "and a serialized private key...", - key_dir, ) populate_key_dir(private_key, key_dir) - LOGGER.info("Migrated the private/public keys in %s.", key_dir) + typer.echo(f"Migrated the private/public keys in {key_dir}.") def downgrade(**kwargs): diff --git a/deployment/migrations/scripts/0002-refresh-chain-pins.py b/src/aleph/ccn_cli/commands/migrations/scripts/0002-refresh-chain-pins.py similarity index 84% rename from deployment/migrations/scripts/0002-refresh-chain-pins.py rename to src/aleph/ccn_cli/commands/migrations/scripts/0002-refresh-chain-pins.py index 27dab2537..ba5bc2792 100644 --- a/deployment/migrations/scripts/0002-refresh-chain-pins.py +++ b/src/aleph/ccn_cli/commands/migrations/scripts/0002-refresh-chain-pins.py @@ -3,14 +3,11 @@ committed on a chain. It has to be run after the introduction of PermanentPin in version 0.2.0. """ -import logging - +import typer from configmanager import Config -from aleph.model import PermanentPin from aleph.model.chains import Chain - -logger = logging.getLogger() +from aleph.model.filepin import PermanentPin async def upgrade(config: Config, **kwargs): @@ -19,11 +16,11 @@ async def upgrade(config: Config, **kwargs): expected_permanent_pins = 5000 if (await PermanentPin.count(filter={})) < expected_permanent_pins: - logger.info("PermanentPin documents missing, fetching chaindata again") + typer.echo("PermanentPin documents missing, fetching chaindata again") start_height = config.ethereum.start_height.value await Chain.set_last_height("ETH", start_height) else: - logger.info( + typer.echo( "PermanentPin documents already present, no need to re-fetch chaindata" ) diff --git a/src/aleph/ccn_cli/commands/repair.py b/src/aleph/ccn_cli/commands/repair.py new file mode 100644 index 000000000..c130800bd --- /dev/null +++ b/src/aleph/ccn_cli/commands/repair.py @@ -0,0 +1,160 @@ +""" +Repairs the local CCN by checking the following +""" +import asyncio +import itertools +from typing import Dict, FrozenSet, Set, Tuple +from typing import cast + +import typer +from aleph_message.models import ItemHash +from configmanager import Config + +import aleph.model +import aleph.services.p2p.singleton as singleton +from aleph import config as aleph_config +from aleph.ccn_cli.cli_config import CliConfig +from aleph.config import get_defaults +from aleph.exceptions import ContentCurrentlyUnavailable +from aleph.model import init_db_globals +from aleph.services.p2p import http +from aleph.storage import get_hash_content +from .toolkit.local_storage import list_expected_local_files + +repair_ns = typer.Typer() + + +async def init_api_servers(): + peers = [peer async for peer in aleph.model.db["peers"].find({"type": "HTTP"})] + singleton.api_servers = [peer["address"] for peer in peers] + + +async def list_missing_files() -> FrozenSet[str]: + if aleph.model.db is None: # for mypy + raise ValueError("DB not initialized as expected.") + + # Get a set of all the files currently in GridFS + gridfs_files = frozenset( + [ + file["filename"] + async for file in aleph.model.db["fs.files"].find( + projection={"_id": 0, "filename": 1}, + batch_size=1000, + ) + ] + ) + + typer.echo(f"Found {len(gridfs_files)} files in local storage.") + + expected_local_files_dict = await list_expected_local_files() + expected_local_files = frozenset( + itertools.chain.from_iterable(expected_local_files_dict.values()) + ) + + missing_files = expected_local_files - gridfs_files + return missing_files + + +async def fetch_and_store_file(filename: str): + item_hash = ItemHash(filename) + _ = await get_hash_content( + content_hash=filename, + engine=item_hash.item_type, + use_network=True, + use_ipfs=True, + store_value=True, + timeout=15, + ) + + +def process_results( + finished_tasks: Set[asyncio.Task], task_dict: Dict[asyncio.Task, str] +) -> Tuple[Set[str], Set[str]]: + fetched_files = set() + failed_files = set() + + for task in finished_tasks: + filename = task_dict.pop(task) + exception = task.exception() + + if exception is None: + fetched_files.add(filename) + + else: + failed_files.add(filename) + if isinstance(exception, ContentCurrentlyUnavailable): + typer.echo( + f"WARNING: Could not fetch {filename}: currently unavailable." + ) + else: + typer.echo( + f"ERROR: Could not fetch {filename}: unexpected error: {exception}" + ) + + return fetched_files, failed_files + + +async def fetch_files(missing_files: FrozenSet[str], batch_size: int): + tasks = set() + task_dict = {} + + fetched_files = set() + failed_files = set() + + for i, filename in enumerate(missing_files, start=1): + typer.echo(f"Fetching {filename} ({i}/{len(missing_files)})...") + fetch_task = asyncio.create_task(fetch_and_store_file(filename)) + tasks.add(fetch_task) + task_dict[fetch_task] = filename + + if len(tasks) == batch_size: + done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + fetched, failed = process_results(done, task_dict) + fetched_files |= fetched + failed_files |= failed + + # Finish + if tasks: + done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + fetched, failed = process_results(done, task_dict) + fetched_files |= fetched + failed_files |= failed + + typer.echo(f"Successfully fetched {len(fetched_files)} files.") + if failed_files: + typer.echo(f"WARNING: Failed to fetch {len(failed_files)} files.") + + +async def fetch_missing_files(): + missing_files = await list_missing_files() + typer.echo(f"Found {len(missing_files)} missing files.") + + await fetch_files(missing_files, 2000) + + +async def run(ctx: typer.Context): + config = Config(schema=get_defaults()) + cli_config = cast(CliConfig, ctx.obj) + config.yaml.load(str(cli_config.config_file_path)) + + # Set the config global variable, otherwise the IPFS client will not be initialized properly + aleph_config.app_config = config + + init_db_globals(config=config) + # To be able to fetch data from the network + await init_api_servers() + if aleph.model.db is None: # for mypy + raise ValueError("DB not initialized as expected.") + + await fetch_missing_files() + + # Clean up aiohttp client sessions to avoid a warning + for client_session in http.SESSIONS.values(): + await client_session.close() + + typer.echo("Done.") + + +@repair_ns.command(name="run") +def run_repair(ctx: typer.Context): + asyncio.run(run(ctx)) diff --git a/src/aleph/ccn_cli/commands/toolkit/__init__.py b/src/aleph/ccn_cli/commands/toolkit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/aleph/ccn_cli/commands/toolkit/local_storage.py b/src/aleph/ccn_cli/commands/toolkit/local_storage.py new file mode 100644 index 000000000..d0f5cab1b --- /dev/null +++ b/src/aleph/ccn_cli/commands/toolkit/local_storage.py @@ -0,0 +1,70 @@ +from typing import Any, Dict, FrozenSet, List, Optional + +from aleph_message.models import MessageType + +from aleph.model.filepin import PermanentPin +from aleph.model.messages import Message + + +async def get_hashes( + item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None +) -> FrozenSet[str]: + def rgetitem(dictionary: Any, fields: List[str]) -> Any: + value = dictionary[fields[0]] + if len(fields) > 1: + return rgetitem(value, fields[1:]) + return value + + filters = { + # Check if the hash field exists in case the message was forgotten + item_hash_field: {"$exists": 1}, + item_type_field: {"$in": ["ipfs", "storage"]}, + } + if msg_type: + filters["type"] = msg_type + + hashes = [ + rgetitem(msg, item_hash_field.split(".")) + async for msg in Message.collection.find( + filters, + {item_hash_field: 1}, + batch_size=1000, + ) + ] + + # Temporary fix for api2. A message has a list of dicts as item hash. + hashes = [h for h in hashes if isinstance(h, str)] + + return frozenset(hashes) + + +async def list_expected_local_files() -> Dict[str, FrozenSet[str]]: + """ + Lists the files that are expected to be found on the local storage of the CCN. + This includes: + * the content of any message with item_type in ["storage", "ipfs"] + * the stored content of any STORE message with content.item_type in ["storage", "ipfs"] + * file pins + """ + + expected_files = {} + + expected_files["non-inline messages"] = await get_hashes( + item_type_field="item_type", + item_hash_field="item_hash", + ) + expected_files["stores"] = await get_hashes( + item_type_field="content.item_type", + item_hash_field="content.item_hash", + msg_type=MessageType.store, + ) + + # We also keep permanent pins, even if they are also stored on IPFS + expected_files["file pins"] = frozenset( + [ + pin["multihash"] + async for pin in PermanentPin.collection.find({}, {"multihash": 1}) + ] + ) + + return expected_files diff --git a/src/aleph/ccn_cli/main.py b/src/aleph/ccn_cli/main.py new file mode 100644 index 000000000..308c25773 --- /dev/null +++ b/src/aleph/ccn_cli/main.py @@ -0,0 +1,80 @@ +from pathlib import Path +from typing import Optional + +import typer + +from .cli_config import CliConfig +from .commands.garbage_collector import gc_ns +from .commands.keys import keys_ns +from .commands.migrations import migrations_ns +from .commands.repair import repair_ns + +app = typer.Typer() + + +def validate_config_file_path(config: Optional[Path]) -> Optional[Path]: + if config is not None: + if not config.is_file(): + raise typer.BadParameter(f"'{config.absolute()}' does not exist") + + return config + + +def validate_key_dir(key_dir: Optional[Path]) -> Optional[Path]: + if key_dir is not None: + if key_dir.exists and not key_dir.is_dir(): + raise typer.BadParameter( + f"'{key_dir.absolute()}' already exists and is not a directory" + ) + + return key_dir + + +@app.callback() +def main( + ctx: typer.Context, + config: Optional[Path] = typer.Option( + None, + help="Path to the configuration file of the CCN. Defaults to /config.yml.", + callback=validate_config_file_path, + ), + key_dir: Optional[Path] = typer.Option( + None, + help="Path to the key directory. Defaults to /keys/.", + callback=validate_key_dir, + ), + verbose: bool = typer.Option(False, help="Show more information."), +): + """ + Aleph Core Channel Node CLI for operators. + """ + + cli_config = CliConfig( + config_file_path=Path.cwd() / "config.yml", + key_dir=Path.cwd() / "keys", + verbose=False, + ) + + if config is not None: + cli_config.config_file_path = config + + if key_dir is not None: + cli_config.key_dir = key_dir + + cli_config.verbose = verbose + + ctx.obj = cli_config + + +app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.") +app.add_typer(keys_ns, name="keys", help="Operations on private keys.") +app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.") +app.add_typer( + repair_ns, + name="repair", + help="Performs checks on the local install and fixes issues like missing files.", +) + + +if __name__ == "__main__": + app() diff --git a/src/aleph/ccn_cli/services/__init__.py b/src/aleph/ccn_cli/services/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/aleph/ccn_cli/services/keys.py b/src/aleph/ccn_cli/services/keys.py new file mode 100644 index 000000000..1198da957 --- /dev/null +++ b/src/aleph/ccn_cli/services/keys.py @@ -0,0 +1,43 @@ +import os.path + +from p2pclient.libp2p_stubs.crypto.rsa import ( + KeyPair, + create_new_key_pair, +) + + +def generate_keypair() -> KeyPair: + """ + Generates a new key pair for the node. + """ + key_pair = create_new_key_pair() + return key_pair + + +def save_keys(key_pair: KeyPair, key_dir: str) -> None: + """ + Saves the private and public keys to the specified directory. The keys are stored in 3 formats: + - The private key is stored in PEM format for ease of use, and in a serialized format compatible with the P2P + daemon (DER + protobuf encoding). + - The public key is stored in PEM format. + """ + # Create the key directory if it does not exist + if os.path.exists(key_dir): + if not os.path.isdir(key_dir): + raise NotADirectoryError(f"Key directory ({key_dir}) is not a directory") + else: + os.makedirs(key_dir) + + # Save the private and public keys in the key directory, as well as the serialized private key for p2pd. + private_key_path = os.path.join(key_dir, "node-secret.key") + public_key_path = os.path.join(key_dir, "node-pub.key") + serialized_key_path = os.path.join(key_dir, "serialized-node-secret.key") + + with open(private_key_path, "wb") as key_file: + key_file.write(key_pair.private_key.impl.export_key()) + + with open(public_key_path, "wb") as key_file: + key_file.write(key_pair.public_key.impl.export_key()) + + with open(serialized_key_path, "wb") as f: + f.write(key_pair.private_key.serialize()) diff --git a/src/aleph/chains/tezos.py b/src/aleph/chains/tezos.py index 765f1ddc7..f061378c9 100644 --- a/src/aleph/chains/tezos.py +++ b/src/aleph/chains/tezos.py @@ -1,5 +1,7 @@ +import datetime as dt import json import logging +from enum import Enum from aleph_pytezos.crypto.key import Key @@ -10,13 +12,83 @@ LOGGER = logging.getLogger(__name__) CHAIN_NAME = "TEZOS" +# Default dApp URL for Micheline-style signatures +DEFAULT_DAPP_URL = "aleph.im" + + +class TezosSignatureType(str, Enum): + RAW = "raw" + MICHELINE = "micheline" + + +def timestamp_to_iso_8601(timestamp: float) -> str: + """ + Returns the timestamp formatted to ISO-8601, JS-style. + + Compared to the regular `isoformat()`, this function only provides precision down + to milliseconds and prints a "Z" instead of +0000 for UTC. + This format is typically used by JavaScript applications, like our TS SDK. + + Example: 2022-09-23T14:41:19.029Z + + :param timestamp: The timestamp to format. + :return: The formatted timestamp. + """ + + return ( + dt.datetime.utcfromtimestamp(timestamp).isoformat(timespec="milliseconds") + "Z" + ) + + +def micheline_verification_buffer( + verification_buffer: bytes, + timestamp: float, + dapp_url: str, +) -> bytes: + """ + Computes the verification buffer for Micheline-type signatures. + + This verification buffer is used when signing data with a Tezos web wallet. + See https://tezostaquito.io/docs/signing/#generating-a-signature-with-beacon-sdk. + + :param verification_buffer: The original (non-Tezos) verification buffer for the Aleph message. + :param timestamp: Timestamp of the message. + :param dapp_url: The URL of the dApp, for use as part of the verification buffer. + :return: The verification buffer used for the signature by the web wallet. + """ + + prefix = b"Tezos Signed Message:" + timestamp = timestamp_to_iso_8601(timestamp).encode("utf-8") + + payload = b" ".join( + (prefix, dapp_url.encode("utf-8"), timestamp, verification_buffer) + ) + hex_encoded_payload = payload.hex() + payload_size = str(len(hex_encoded_payload)).encode("utf-8") + + return b"\x05" + b"\x01\x00" + payload_size + payload + + +def get_tezos_verification_buffer( + message: BasePendingMessage, signature_type: TezosSignatureType, dapp_url: str +) -> bytes: + verification_buffer = get_verification_buffer(message) + + if signature_type == TezosSignatureType.RAW: + return verification_buffer + elif signature_type == TezosSignatureType.MICHELINE: + return micheline_verification_buffer( + verification_buffer, message.time, dapp_url + ) + + raise ValueError(f"Unsupported signature type: {signature_type}") + async def verify_signature(message: BasePendingMessage) -> bool: """ Verifies the cryptographic signature of a message signed with a Tezos key. """ - verification_buffer = get_verification_buffer(message) try: signature_dict = json.loads(message.signature) except json.JSONDecodeError: @@ -30,6 +102,9 @@ async def verify_signature(message: BasePendingMessage) -> bool: LOGGER.exception("'%s' key missing from Tezos signature dictionary.", e.args[0]) return False + signature_type = TezosSignatureType(signature_dict.get("signingType", "raw")) + dapp_url = signature_dict.get("dAppUrl", DEFAULT_DAPP_URL) + key = Key.from_encoded_key(public_key) # Check that the sender ID is equal to the public key hash public_key_hash = key.public_key_hash() @@ -41,6 +116,10 @@ async def verify_signature(message: BasePendingMessage) -> bool: public_key_hash, ) + verification_buffer = get_tezos_verification_buffer( + message, signature_type, dapp_url + ) + # Check the signature try: key.verify(signature, verification_buffer) diff --git a/src/aleph/cli/args.py b/src/aleph/cli/args.py index 39c253ef5..9ce5c6982 100644 --- a/src/aleph/cli/args.py +++ b/src/aleph/cli/args.py @@ -46,21 +46,6 @@ def parse_args(args): action="store_const", const=logging.DEBUG, ) - parser.add_argument( - "-g", - "--gen-keys", - dest="generate_keys", - help="Generate a node key and exit", - action="store_true", - default=False, - ) - parser.add_argument( - "--print-key", - dest="print_key", - help="Print the generated private key", - action="store_true", - default=False, - ) parser.add_argument( "-k", "--key-dir", diff --git a/src/aleph/commands.py b/src/aleph/commands.py index f3170c220..568f092a6 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -146,16 +146,6 @@ async def main(args): args = parse_args(args) setup_logging(args.loglevel) - # Generate keys and exit - if args.generate_keys: - LOGGER.info("Generating a key pair") - key_pair = generate_keypair(args.print_key) - save_keys(key_pair, args.key_dir) - if args.print_key: - print(key_pair.private_key.impl.export_key().decode("utf-8")) - - return - LOGGER.info("Loading configuration") config = aleph.config.app_config diff --git a/src/aleph/config.py b/src/aleph/config.py index b6229f872..14cca2eba 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -7,7 +7,7 @@ def get_defaults(): return { "logging": { "level": logging.WARNING, - "max_log_file_size": 1_000_000_000 # 1GB, + "max_log_file_size": 1_000_000_000, # 1GB, }, "aleph": { "queue_topic": "ALEPH-QUEUE", @@ -39,7 +39,12 @@ def get_defaults(): "/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U", ], }, - "storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"}, + "storage": { + "folder": "./data/", + "store_files": False, + "engine": "mongodb", + "temporary_files_ttl": 3600, + }, "nuls": { "chain_id": 8964, "enabled": False, @@ -89,7 +94,7 @@ def get_defaults(): "peers": [ "/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx", "/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2", - "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF" + "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF", ], }, "sentry": { diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index cb5aa69fd..dd35e445d 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -19,6 +19,7 @@ from aleph.model.pending import PendingMessage from aleph.services.p2p import singleton from .job_utils import prepare_loop, process_job_results +from ..exceptions import InvalidMessageError from ..schemas.pending_messages import parse_message LOGGER = getLogger("jobs.pending_messages") @@ -49,7 +50,15 @@ async def handle_pending_message( seen_ids: Dict[Tuple, int], ) -> List[DbBulkOperation]: - message = parse_message(pending["message"]) + delete_pending_message_op = DbBulkOperation( + PendingMessage, DeleteOne({"_id": pending["_id"]}) + ) + + try: + message = parse_message(pending["message"]) + except InvalidMessageError: + # If an invalid message somehow ended in pending messages, drop it. + return [delete_pending_message_op] async with sem: status, operations = await incoming( @@ -64,9 +73,7 @@ async def handle_pending_message( ) if status != IncomingStatus.RETRYING_LATER: - operations.append( - DbBulkOperation(PendingMessage, DeleteOne({"_id": pending["_id"]})) - ) + operations.append(delete_pending_message_op) return operations diff --git a/tests/chains/test_tezos.py b/tests/chains/test_tezos.py index 4c6be4c37..14ca20402 100644 --- a/tests/chains/test_tezos.py +++ b/tests/chains/test_tezos.py @@ -2,10 +2,13 @@ from aleph.network import verify_signature from aleph.schemas.pending_messages import parse_message +from aleph.chains import ( + tezos, +) # TODO: this import is currently necessary because of circular dependencies @pytest.mark.asyncio -async def test_tezos_verify_signature(): +async def test_tezos_verify_signature_raw(): message_dict = { "chain": "TEZOS", "channel": "TEST", @@ -28,7 +31,7 @@ async def test_tezos_verify_signature(): @pytest.mark.asyncio -async def test_tezos_verify_signature_ed25519(): +async def test_tezos_verify_signature_raw_ed25519(): message_dict = { "chain": "TEZOS", "sender": "tz1SmGHzna3YhKropa3WudVq72jhTPDBn4r5", @@ -43,3 +46,20 @@ async def test_tezos_verify_signature_ed25519(): message = parse_message(message_dict) await verify_signature(message) + + +@pytest.mark.asyncio +async def test_tezos_verify_signature_micheline(): + message_dict = { + "chain": "TEZOS", + "sender": "tz1VrPqrVdMFsgykWyhGH7SYcQ9avHTjPcdD", + "type": "POST", + "channel": "ALEPH-TEST", + "signature": '{"signingType":"micheline","signature":"sigXD8iT5ivdawgPzE1AbtDwqqAjJhS5sHS1psyE74YjfiaQnxWZsATNjncdsuQw3b9xaK79krxtsC8uQoT5TcUXmo66aovT","publicKey":"edpkvapDnjnasrNcmUdMZXhQZwpX6viPyuGCq6nrP4W7ZJCm7EFTpS"}', + "time": 1663944079.029, + "item_type": "storage", + "item_hash": "72b2722b95582419cfa71f631ff6c6afc56344dc6a4609e772877621813040b7", + } + + message = parse_message(message_dict) + await verify_signature(message)