Skip to content

Commit dd702df

Browse files
committed
wip repair
1 parent b94be05 commit dd702df

File tree

4 files changed

+141
-56
lines changed

4 files changed

+141
-56
lines changed

src/aleph/ccn_cli/commands/garbage_collector.py

Lines changed: 4 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,23 @@
55
"""
66
import asyncio
77
import datetime as dt
8-
from typing import Any, Dict, FrozenSet, List, Optional
8+
from typing import Dict, FrozenSet
99
from typing import cast
1010

1111
import pytz
1212
import typer
13-
from aleph_message.models import MessageType
1413
from configmanager import Config
1514

1615
import aleph.model
1716
from aleph.ccn_cli.cli_config import CliConfig
1817
from aleph.config import get_defaults
1918
from aleph.model import init_db_globals
20-
from aleph.model.filepin import PermanentPin
2119
from aleph.model.hashes import delete_value as delete_gridfs_file
22-
from aleph.model.messages import Message
20+
from .toolkit.local_storage import list_expected_local_files
2321

2422
gc_ns = typer.Typer()
2523

2624

27-
async def get_hashes(
28-
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None
29-
) -> FrozenSet[str]:
30-
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
31-
value = dictionary[fields[0]]
32-
if len(fields) > 1:
33-
return rgetitem(value, fields[1:])
34-
return value
35-
36-
filters = {
37-
# Check if the hash field exists in case the message was forgotten
38-
item_hash_field: {"$exists": 1},
39-
item_type_field: {"$in": ["ipfs", "storage"]},
40-
}
41-
if msg_type:
42-
filters["type"] = msg_type
43-
44-
hashes = [
45-
rgetitem(msg, item_hash_field.split("."))
46-
async for msg in Message.collection.find(
47-
filters,
48-
{item_hash_field: 1},
49-
batch_size=1000,
50-
)
51-
]
52-
53-
# Temporary fix for api2. A message has a list of dicts as item hash.
54-
hashes = [h for h in hashes if isinstance(h, str)]
55-
56-
return frozenset(hashes)
57-
58-
5925
def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
6026
typer.echo("The following files will be preserved:")
6127
for file_type, files in files_to_preserve.items():
@@ -79,26 +45,8 @@ async def list_files_to_preserve(
7945
]
8046
)
8147

82-
# Get all the messages that potentially store data in local storage:
83-
# * any message with item_type in ["storage", "ipfs"]
84-
# * STOREs with content.item_type in ["storage", "ipfs"]
85-
files_to_preserve_dict["non-inline messages"] = await get_hashes(
86-
item_type_field="item_type",
87-
item_hash_field="item_hash",
88-
)
89-
files_to_preserve_dict["stores"] = await get_hashes(
90-
item_type_field="content.item_type",
91-
item_hash_field="content.item_hash",
92-
msg_type=MessageType.store,
93-
)
94-
95-
# We also keep permanent pins, even if they are also stored on IPFS
96-
files_to_preserve_dict["file pins"] = frozenset(
97-
[
98-
pin["multihash"]
99-
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
100-
]
101-
)
48+
expected_local_files = await list_expected_local_files()
49+
files_to_preserve_dict.update(expected_local_files)
10250

10351
return files_to_preserve_dict
10452

src/aleph/ccn_cli/commands/repair.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
Repairs the local CCN by checking the following
3+
"""
4+
import asyncio
5+
import itertools
6+
from typing import Dict, FrozenSet
7+
from typing import cast
8+
9+
import typer
10+
from configmanager import Config
11+
12+
import aleph.model
13+
from aleph.ccn_cli.cli_config import CliConfig
14+
from aleph.config import get_defaults
15+
from aleph.model import init_db_globals
16+
from .toolkit.local_storage import list_expected_local_files
17+
18+
repair_ns = typer.Typer()
19+
20+
21+
22+
def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
23+
typer.echo("The following files will be preserved:")
24+
for file_type, files in files_to_preserve.items():
25+
typer.echo(f"* {len(files)} {file_type}")
26+
27+
28+
async def list_missing_files() -> FrozenSet[str]:
29+
# Get a set of all the files currently in GridFS
30+
gridfs_files_dict = {
31+
file["filename"]: file
32+
async for file in aleph.model.db["fs.files"].find(
33+
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
34+
batch_size=1000,
35+
)
36+
}
37+
38+
gridfs_files = frozenset(gridfs_files_dict.keys())
39+
typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")
40+
41+
expected_local_files_dict = await list_expected_local_files()
42+
expected_local_files = frozenset(itertools.chain.from_iterable(expected_local_files_dict.values()))
43+
44+
missing_files = expected_local_files - gridfs_files
45+
return missing_files
46+
47+
48+
async def fetch_missing_files():
49+
missing_files = await list_missing_files()
50+
typer.echo(f"Found {len(missing_files)} missing files.")
51+
52+
53+
async def run(ctx: typer.Context):
54+
config = Config(schema=get_defaults())
55+
cli_config = cast(CliConfig, ctx.obj)
56+
config.yaml.load(str(cli_config.config_file_path))
57+
58+
init_db_globals(config=config)
59+
if aleph.model.db is None: # for mypy
60+
raise ValueError("DB not initialized as expected.")
61+
62+
await fetch_missing_files()
63+
64+
typer.echo("Done.")
65+
66+
67+
@repair_ns.command(name="run")
68+
def run_repair(ctx: typer.Context):
69+
asyncio.run(run(ctx))

src/aleph/ccn_cli/commands/toolkit/__init__.py

Whitespace-only changes.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from typing import Any, Dict, FrozenSet, List, Optional
2+
from aleph.model.messages import Message
3+
from aleph.model.filepin import PermanentPin
4+
from aleph_message.models import MessageType
5+
6+
7+
async def get_hashes(
8+
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None
9+
) -> FrozenSet[str]:
10+
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
11+
value = dictionary[fields[0]]
12+
if len(fields) > 1:
13+
return rgetitem(value, fields[1:])
14+
return value
15+
16+
filters = {
17+
# Check if the hash field exists in case the message was forgotten
18+
item_hash_field: {"$exists": 1},
19+
item_type_field: {"$in": ["ipfs", "storage"]},
20+
}
21+
if msg_type:
22+
filters["type"] = msg_type
23+
24+
hashes = [
25+
rgetitem(msg, item_hash_field.split("."))
26+
async for msg in Message.collection.find(
27+
filters,
28+
{item_hash_field: 1},
29+
batch_size=1000,
30+
)
31+
]
32+
33+
# Temporary fix for api2. A message has a list of dicts as item hash.
34+
hashes = [h for h in hashes if isinstance(h, str)]
35+
36+
return frozenset(hashes)
37+
38+
39+
async def list_expected_local_files() -> Dict[str, FrozenSet[str]]:
40+
"""
41+
Lists the files that are expected to be found on the local storage of the CCN.
42+
This includes:
43+
* the content of any message with item_type in ["storage", "ipfs"]
44+
* the stored content of any STORE message with content.item_type in ["storage", "ipfs"]
45+
* file pins
46+
"""
47+
48+
expected_files = {}
49+
50+
expected_files["non-inline messages"] = await get_hashes(
51+
item_type_field="item_type",
52+
item_hash_field="item_hash",
53+
)
54+
expected_files["stores"] = await get_hashes(
55+
item_type_field="content.item_type",
56+
item_hash_field="content.item_hash",
57+
msg_type=MessageType.store,
58+
)
59+
60+
# We also keep permanent pins, even if they are also stored on IPFS
61+
expected_files["file pins"] = frozenset(
62+
[
63+
pin["multihash"]
64+
async for pin in PermanentPin.collection.find({}, {"multihash": 1})
65+
]
66+
)
67+
68+
return expected_files

0 commit comments

Comments
 (0)