Skip to content

Commit 913e007

Browse files
authored
Refactor: get_total_cost_for_address + fix View (#466)
Solutions: - Move get_total_cost_for_address from accessors/vm.py to accesors/cost.py - Add unit test - Fix view use correct unit in view calculation (MB to MiB)
1 parent 1630ca7 commit 913e007

File tree

5 files changed

+227
-13
lines changed

5 files changed

+227
-13
lines changed

deployment/migrations/versions/0018_7bcb8e5fe186_fix_vm_cost_view.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def upgrade() -> None:
9393
JOIN files f ON file_pins.file_hash::text = f.hash::text
9494
WHERE file_pins.owner IS NOT NULL
9595
GROUP BY file_pins.owner) storage ON vm_prices.owner::text = storage.owner::text,
96-
LATERAL ( SELECT 3::numeric * storage.storage_size / 1000000::numeric AS total_storage_cost) sc,
96+
LATERAL ( SELECT 3::numeric * storage.storage_size / 1048576::numeric AS total_storage_cost) sc,
9797
LATERAL ( SELECT COALESCE(vm_prices.total_vm_cost, 0::double precision) +
9898
COALESCE(sc.total_storage_cost, 0::numeric)::double precision AS total_cost) tc
9999
"""

src/aleph/db/accessors/cost.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from decimal import Decimal
2+
from typing import Optional
3+
from sqlalchemy import select, func, text
4+
from aleph.types.db_session import DbSession
5+
6+
7+
def get_total_cost_for_address(session: DbSession, address: str) -> Decimal:
8+
select_stmt = (
9+
select(func.sum(text("total_cost")))
10+
.select_from(text("public.costs_view"))
11+
.where(text("address = :address"))
12+
).params(address=address)
13+
14+
total_cost = session.execute(select_stmt).scalar()
15+
return Decimal(total_cost) if total_cost is not None else Decimal(0)

src/aleph/db/accessors/vms.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,3 @@ def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
113113
)
114114
session.execute(delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash))
115115
session.execute(upsert_stmt)
116-
117-
118-
def get_total_cost_for_address(session: DbSession, address: str) -> Decimal:
119-
select_stmt = (
120-
select(func.sum(text("total_cost")))
121-
.select_from(text("public.costs_view"))
122-
.where(text("address = :address"))
123-
).params(address=address)
124-
125-
total_cost = session.execute(select_stmt).scalar()
126-
return Decimal(total_cost) if total_cost is not None else Decimal(0)

src/aleph/handlers/content/vm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from decimal import Decimal
1919

2020
from aleph.db.accessors.balances import get_total_balance
21+
from aleph.db.accessors.cost import get_total_cost_for_address
2122
from aleph.db.accessors.files import (
2223
find_file_tags,
2324
find_file_pins,
@@ -31,7 +32,6 @@
3132
delete_vm_updates,
3233
refresh_vm_version,
3334
is_vm_amend_allowed,
34-
get_total_cost_for_address,
3535
)
3636
from aleph.db.models import (
3737
MessageDb,

tests/db/test_cost.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
from typing import List, Protocol
2+
3+
import pytest
4+
import pytz
5+
from aleph_message.models import (
6+
Chain,
7+
InstanceContent,
8+
MessageType,
9+
ItemType,
10+
ExecutableContent,
11+
ProgramContent,
12+
)
13+
from decimal import Decimal
14+
15+
from aleph_message.models.execution.volume import ImmutableVolume
16+
17+
from aleph.db.accessors.cost import get_total_cost_for_address
18+
from aleph.db.accessors.files import insert_message_file_pin, upsert_file_tag
19+
from aleph.db.models import (
20+
AlephBalanceDb,
21+
PendingMessageDb,
22+
StoredFileDb,
23+
MessageStatusDb,
24+
)
25+
import json
26+
from aleph.toolkit.timestamp import timestamp_to_datetime
27+
from aleph.types.db_session import DbSessionFactory, DbSession
28+
import datetime as dt
29+
30+
from aleph.types.files import FileType, FileTag
31+
from aleph.types.message_status import MessageStatus
32+
33+
34+
class Volume(Protocol):
35+
ref: str
36+
use_latest: bool
37+
38+
39+
def get_volume_refs(content: ExecutableContent) -> List[Volume]:
40+
volumes = []
41+
42+
for volume in content.volumes:
43+
if isinstance(volume, ImmutableVolume):
44+
volumes.append(volume)
45+
46+
if isinstance(content, ProgramContent):
47+
volumes += [content.code, content.runtime]
48+
if content.data:
49+
volumes.append(content.data)
50+
51+
elif isinstance(content, InstanceContent):
52+
if parent := content.rootfs.parent:
53+
volumes.append(parent)
54+
55+
return volumes
56+
57+
58+
def insert_volume_refs(session: DbSession, message: PendingMessageDb):
59+
"""
60+
Insert volume references in the DB to make the program processable.
61+
"""
62+
63+
content = InstanceContent.parse_raw(message.item_content)
64+
volumes = get_volume_refs(content)
65+
66+
created = pytz.utc.localize(dt.datetime(2023, 1, 1))
67+
68+
for volume in volumes:
69+
# Note: we use the reversed ref to generate the file hash for style points,
70+
# but it could be set to any valid hash.
71+
file_hash = volume.ref[::-1]
72+
73+
session.add(StoredFileDb(hash=file_hash, size=1024 * 1024, type=FileType.FILE))
74+
session.flush()
75+
insert_message_file_pin(
76+
session=session,
77+
file_hash=volume.ref[::-1],
78+
owner=content.address,
79+
item_hash=volume.ref,
80+
ref=None,
81+
created=created,
82+
)
83+
upsert_file_tag(
84+
session=session,
85+
tag=FileTag(volume.ref),
86+
owner=content.address,
87+
file_hash=volume.ref[::-1],
88+
last_updated=created,
89+
)
90+
91+
92+
@pytest.fixture
93+
def fixture_instance_message(session_factory: DbSessionFactory) -> PendingMessageDb:
94+
content = {
95+
"address": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
96+
"allow_amend": False,
97+
"variables": {
98+
"VM_CUSTOM_VARIABLE": "SOMETHING",
99+
"VM_CUSTOM_VARIABLE_2": "32",
100+
},
101+
"environment": {
102+
"reproducible": True,
103+
"internet": False,
104+
"aleph_api": False,
105+
"shared_cache": False,
106+
},
107+
"resources": {"vcpus": 1, "memory": 128, "seconds": 30},
108+
"requirements": {"cpu": {"architecture": "x86_64"}},
109+
"rootfs": {
110+
"parent": {
111+
"ref": "549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613",
112+
"use_latest": True,
113+
},
114+
"persistence": "host",
115+
"name": "test-rootfs",
116+
"size_mib": 20 * 1024,
117+
},
118+
"authorized_keys": [
119+
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGULT6A41Msmw2KEu0R9MvUjhuWNAsbdeZ0DOwYbt4Qt user@example",
120+
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIH0jqdc5dmt75QhTrWqeHDV9xN8vxbgFyOYs2fuQl7CI",
121+
],
122+
"volumes": [
123+
{
124+
"comment": "Python libraries. Read-only since a 'ref' is specified.",
125+
"mount": "/opt/venv",
126+
"ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51",
127+
"use_latest": False,
128+
},
129+
{
130+
"comment": "Ephemeral storage, read-write but will not persist after the VM stops",
131+
"mount": "/var/cache",
132+
"ephemeral": True,
133+
"size_mib": 5,
134+
},
135+
{
136+
"comment": "Working data persisted on the VM supervisor, not available on other nodes",
137+
"mount": "/var/lib/sqlite",
138+
"name": "sqlite-data",
139+
"persistence": "host",
140+
"size_mib": 10,
141+
},
142+
{
143+
"comment": "Working data persisted on the Aleph network. "
144+
"New VMs will try to use the latest version of this volume, "
145+
"with no guarantee against conflicts",
146+
"mount": "/var/lib/statistics",
147+
"name": "statistics",
148+
"persistence": "store",
149+
"size_mib": 10,
150+
},
151+
{
152+
"comment": "Raw drive to use by a process, do not mount it",
153+
"name": "raw-data",
154+
"persistence": "host",
155+
"size_mib": 10,
156+
},
157+
],
158+
"time": 1619017773.8950517,
159+
}
160+
161+
pending_message = PendingMessageDb(
162+
item_hash="734a1287a2b7b5be060312ff5b05ad1bcf838950492e3428f2ac6437a1acad26",
163+
type=MessageType.instance,
164+
chain=Chain.ETH,
165+
sender="0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba",
166+
signature=None,
167+
item_type=ItemType.inline,
168+
item_content=json.dumps(content),
169+
time=timestamp_to_datetime(1619017773.8950577),
170+
channel=None,
171+
reception_time=timestamp_to_datetime(1619017774),
172+
fetched=True,
173+
check_message=False,
174+
retries=0,
175+
next_attempt=dt.datetime(2023, 1, 1),
176+
)
177+
with session_factory() as session:
178+
session.add(pending_message)
179+
session.add(
180+
MessageStatusDb(
181+
item_hash=pending_message.item_hash,
182+
status=MessageStatus.PENDING,
183+
reception_time=pending_message.reception_time,
184+
)
185+
)
186+
session.commit()
187+
188+
return pending_message
189+
190+
191+
def test_get_total_cost_for_address(
192+
session_factory: DbSessionFactory, fixture_instance_message
193+
):
194+
with session_factory() as session:
195+
session.add(
196+
AlephBalanceDb(
197+
address="0xB68B9D4f3771c246233823ed1D3Add451055F9Ef",
198+
chain=Chain.ETH,
199+
dapp=None,
200+
balance=Decimal(100_000),
201+
eth_height=0,
202+
)
203+
)
204+
insert_volume_refs(session, fixture_instance_message)
205+
session.commit()
206+
207+
total_cost: Decimal = get_total_cost_for_address(
208+
session=session, address=fixture_instance_message.sender
209+
)
210+
assert total_cost == Decimal(6)

0 commit comments

Comments
 (0)