Skip to content

Commit eea3669

Browse files
authored
Merge pull request #57 from ActivityWatch/dev/sqlite
SQlite datastore
2 parents 37d850e + 3531010 commit eea3669

File tree

14 files changed

+377
-81
lines changed

14 files changed

+377
-81
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ build:
1212
test:
1313
python3 -m pytest tests -v --cov=aw_core --cov=aw_datastore --cov=aw_transform --cov=aw_analysis
1414

15+
coverage_html: test
16+
python3 -m coverage html -d coverage_html
17+
1518
benchmark:
1619
python3 -m aw_datastore.benchmark
1720

aw_datastore/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
from typing import Dict, Callable, Any
22
import platform as _platform
33

4+
from .migration import check_for_migration
5+
46
from . import storages
57
from .datastore import Datastore
68

7-
89
# The Callable[[Any], ...] here should be Callable[..., ...] but Python 3.5.2 doesn't
910
# like ellipsises. See here: https://github.com/python/typing/issues/259
1011
def get_storage_methods() -> Dict[str, Callable[[Any], storages.AbstractStorage]]:
11-
from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage
12+
from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage, SqliteStorage
1213
methods = {
1314
PeeweeStorage.sid: PeeweeStorage,
1415
MemoryStorage.sid: MemoryStorage,
16+
SqliteStorage.sid: SqliteStorage,
1517
} # type: Dict[str, Callable[[Any], storages.AbstractStorage]]
1618

1719
# MongoDB is not supported on Windows or macOS

aw_datastore/benchmark.py

100644100755
Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#!/usr/bin/env python3
12
import sys
23
from typing import Callable
34
from datetime import datetime, timedelta, timezone
@@ -7,20 +8,30 @@
78

89
from takethetime import ttt
910

10-
from . import get_storage_methods, Datastore
11-
from .storages import AbstractStorage
11+
from aw_datastore import get_storage_methods, Datastore
12+
from aw_datastore.storages import AbstractStorage
1213

1314

1415
def create_test_events(n):
15-
now = datetime.now(timezone.utc)
16+
now = datetime.now(timezone.utc) - timedelta(days=1000)
1617

17-
events = [None] * n
18+
events = []
1819
for i in range(n):
19-
events[i] = Event(timestamp=now + i * timedelta(hours=1), data={"label": "asd"})
20+
events.append(Event(timestamp=now + i * timedelta(seconds=1), data={"label": "asd"}))
2021

2122
return events
2223

2324

25+
def create_tmpbucket(ds, num):
26+
bucket_id = "benchmark_test_bucket_{}".format(str(num))
27+
try:
28+
ds.delete_bucket(bucket_id)
29+
except KeyError:
30+
pass
31+
ds.create_bucket(bucket_id, "testingtype", "test-client", "testing-box")
32+
return bucket_id
33+
34+
2435
@contextmanager
2536
def temporary_bucket(ds):
2637
bucket_id = "test_bucket"
@@ -35,49 +46,42 @@ def temporary_bucket(ds):
3546

3647
def benchmark(storage: Callable[..., AbstractStorage]):
3748
ds = Datastore(storage, testing=True)
38-
num_events = 5 * 10**4
49+
50+
num_single_events = 50
51+
num_replace_events = 50
52+
num_bulk_events = 2 * 10**3
53+
num_events = num_single_events + num_replace_events + num_bulk_events + 1
54+
num_final_events = num_single_events + num_bulk_events + 1
55+
3956
events = create_test_events(num_events)
57+
single_events = events[:num_single_events]
58+
replace_events = events[num_single_events:num_single_events+num_replace_events]
59+
bulk_events = events[num_single_events+num_replace_events:-1]
4060

4161
print(storage.__name__)
4262

4363
with temporary_bucket(ds) as bucket:
4464
with ttt(" sum"):
45-
with ttt(" insert {} events".format(num_events)):
46-
bucket.insert(events)
65+
with ttt(" single insert {} events".format(num_single_events)):
66+
for event in single_events:
67+
bucket.insert(event)
68+
69+
with ttt(" bulk insert {} events".format(num_bulk_events)):
70+
bucket.insert(bulk_events)
71+
72+
with ttt(" replace last {}".format(num_replace_events)):
73+
for e in replace_events:
74+
bucket.replace_last(e)
4775

4876
with ttt(" insert 1 event"):
4977
bucket.insert(events[-1])
5078

5179
with ttt(" get one"):
5280
events_tmp = bucket.get(limit=1)
53-
# print("Total number of events: {}".format(len(events)))
5481

5582
with ttt(" get all"):
56-
events_tmp = bucket.get(limit=num_events)
57-
print(len(events_tmp))
58-
assert len(events_tmp) == num_events
59-
for e1, e2 in zip(events, sorted(events_tmp, key=lambda e: e.timestamp)):
60-
try:
61-
# Can't do direct comparison since tz will differ in object type (but have identical meaning)
62-
# TODO: Fix the above by overriding __eq__ on Event
63-
assert e1.timestamp.second == e2.timestamp.second
64-
assert e1.timestamp.microsecond == e2.timestamp.microsecond
65-
except AssertionError as e:
66-
print(e1)
67-
print(e2)
68-
raise e
69-
# print("Total number of events: {}".format(len(events)))
70-
71-
def events_in_interval(n):
72-
with ttt(" get {} events within time interval".format(n)):
73-
events_tmp = bucket.get(limit=num_events,
74-
starttime=events[0].timestamp,
75-
endtime=events[n].timestamp)
76-
assert len(events_tmp) == n - 1
77-
# print("Events within time interval: {}".format(len(events)))
78-
79-
events_in_interval(int(num_events / 2))
80-
events_in_interval(10)
83+
events_tmp = bucket.get()
84+
assert len(events_tmp) == num_final_events
8185

8286

8387
if __name__ == "__main__":

aw_datastore/datastore.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,12 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
8484
"""
8585
# NOTE: Should we keep the timestamp checking?
8686
# Get last event for timestamp check after insert
87+
"""
8788
last_event_list = self.get(1)
8889
last_event = None
8990
if len(last_event_list) > 0:
9091
last_event = last_event_list[0]
92+
"""
9193

9294
now = datetime.now(tz=timezone.utc)
9395

@@ -99,7 +101,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
99101
if events.timestamp + events.duration > now:
100102
self.logger.warning("Event inserted into bucket {} reaches into the future. Current UTC time: {}. Event data: {}".format(self.bucket_id, str(now), str(events)))
101103
inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events)
102-
assert inserted
104+
#assert inserted
103105
elif isinstance(events, list):
104106
if len(events) > 0:
105107
oldest_event = sorted(events, key=lambda k: k['timestamp'])[0]
@@ -113,11 +115,13 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
113115
raise TypeError
114116

115117
# Warn if timestamp is older than last event
118+
"""
116119
if last_event and oldest_event:
117120
if oldest_event.timestamp < last_event.timestamp:
118121
self.logger.warning("Inserting event that has a older timestamp than previous event!" +
119122
"\nPrevious:" + str(last_event) +
120123
"\nInserted:" + str(oldest_event))
124+
"""
121125

122126
return inserted
123127

aw_datastore/migration.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from typing import Optional, List
2+
import os
3+
import re
4+
import logging
5+
6+
from aw_core.dirs import get_data_dir
7+
from .storages import AbstractStorage
8+
9+
logger = logging.getLogger(__name__)
10+
11+
def detect_db_files(data_dir: str, datastore_name: str = None, version=None) -> List[str]:
12+
db_files = [filename for filename in os.listdir(data_dir)]
13+
if datastore_name:
14+
db_files = [filename for filename in db_files if filename.split(".")[0] == datastore_name]
15+
if version:
16+
db_files = [filename for filename in db_files if filename.split(".")[1] == "v{}".format(version)]
17+
return db_files
18+
19+
def check_for_migration(datastore: AbstractStorage, datastore_name: str, version: int):
20+
data_dir = get_data_dir("aw-server")
21+
22+
if datastore.sid == "sqlite":
23+
peewee_type = "peewee-sqlite"
24+
peewee_name = peewee_type + "-testing" if datastore.testing else ""
25+
# Migrate from peewee v2
26+
peewee_db_v2 = detect_db_files(data_dir, peewee_name, 2)
27+
if len(peewee_db_v2) > 0:
28+
peewee_v2_to_sqlite_v1(datastore)
29+
30+
def peewee_v2_to_sqlite_v1(datastore):
31+
logger.info("Migrating database from peewee v2 to sqlite v1")
32+
from .storages import PeeweeStorage
33+
pw_db = PeeweeStorage(datastore.testing)
34+
# Fetch buckets and events
35+
buckets = pw_db.buckets()
36+
# Insert buckets and events to new db
37+
for bucket_id in buckets:
38+
logger.info("Migrating bucket {}".format(bucket_id))
39+
bucket = buckets[bucket_id]
40+
datastore.create_bucket(
41+
bucket["id"],
42+
bucket["type"],
43+
bucket["client"],
44+
bucket["hostname"],
45+
bucket["created"],
46+
bucket["name"]
47+
)
48+
bucket_events = pw_db.get_events(bucket_id, -1)
49+
datastore.insert_many(bucket_id, bucket_events)
50+
logger.info("Migration of peewee v2 to sqlite v1 finished")

aw_datastore/storages/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
from .memory import MemoryStorage
88
from .mongodb import MongoDBStorage
99
from .peewee import PeeweeStorage
10+
from .sqlite import SqliteStorage

aw_datastore/storages/abstract.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class AbstractStorage(metaclass=ABCMeta):
1414

1515
@abstractmethod
1616
def __init__(self, testing: bool) -> None:
17+
self.testing = True
1718
raise NotImplementedError
1819

1920
@abstractmethod
@@ -55,7 +56,7 @@ def delete(self, bucket_id: str, event_id: int) -> bool:
5556
raise NotImplementedError
5657

5758
@abstractmethod
58-
def replace(self, bucket_id: str, event_id: int, event: Event) -> Event:
59+
def replace(self, bucket_id: str, event_id: int, event: Event) -> bool:
5960
raise NotImplementedError
6061

6162
@abstractmethod

aw_datastore/storages/memory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ def get_events(self, bucket: str, limit: int,
6363
e.append(event)
6464
events = e
6565
# Limit
66-
if limit < 0:
66+
if limit == 0:
67+
return []
68+
elif limit < 0:
6769
limit = sys.maxsize
6870
events = events[:limit]
6971
# Return

aw_datastore/storages/mongodb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def replace_last(self, bucket_id: str, event: Event):
127127
last_event = list(self.db[bucket_id]["events"].find().sort([("timestamp", -1)]).limit(1))[0]
128128
self.db[bucket_id]["events"].replace_one({"_id": last_event["_id"]}, self._transform_event(event.copy()))
129129

130-
def replace(self, bucket_id: str, event_id, event: Event) -> Event:
130+
def replace(self, bucket_id: str, event_id, event: Event) -> bool:
131131
self.db[bucket_id]["events"].replace_one({"_id": event_id}, self._transform_event(event.copy()))
132132
event.id = event_id
133-
return event
133+
return True

aw_datastore/storages/peewee.py

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -76,45 +76,20 @@ def json(self):
7676
}
7777

7878

79-
def detect_db_files(data_dir: str) -> List[str]:
80-
return [filename for filename in os.listdir(data_dir) if "peewee-sqlite" in filename]
81-
82-
83-
def detect_db_version(data_dir: str, max_version: Optional[int] = None) -> Optional[int]:
84-
"""Returns the most recent version number of any database file found (up to max_version)"""
85-
import re
86-
files = detect_db_files(data_dir)
87-
r = re.compile("v[0-9]+")
88-
re_matches = [r.search(filename) for filename in files]
89-
versions = [int(match.group(0)[1:]) for match in re_matches if match]
90-
if max_version:
91-
versions = [v for v in versions if v <= max_version]
92-
return max(versions) if versions else None
93-
94-
9579
class PeeweeStorage(AbstractStorage):
9680
sid = "peewee"
9781

9882
def __init__(self, testing: bool = True, filepath: str = None) -> None:
9983
data_dir = get_data_dir("aw-server")
10084

101-
# TODO: Won't work with custom filepath
102-
current_db_version = detect_db_version(data_dir, max_version=LATEST_VERSION)
103-
104-
if current_db_version is not None and current_db_version < LATEST_VERSION:
105-
# DB file found but was of an older version
106-
logger.info("Latest version database file found was of an older version")
107-
logger.info("Creating database file for new version {}".format(LATEST_VERSION))
108-
logger.warning("ActivityWatch does not currently support database migrations, new database file will be empty")
109-
11085
if not filepath:
11186
filename = 'peewee-sqlite' + ('-testing' if testing else '') + ".v{}".format(LATEST_VERSION) + '.db'
11287
filepath = os.path.join(data_dir, filename)
11388
self.db = _db
11489
self.db.init(filepath)
11590
logger.info("Using database file: {}".format(filepath))
11691

117-
# db.connect()
92+
self.db.connect()
11893

11994
self.bucket_keys = {} # type: Dict[str, int]
12095
if not BucketModel.table_exists():
@@ -157,12 +132,11 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
157132
"duration": event.duration.total_seconds(),
158133
"datastr": json.dumps(event.data)}
159134
for event in events]
160-
with self.db.atomic():
161-
# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
162-
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
163-
# See: https://github.com/coleifer/peewee/issues/948
164-
for chunk in chunks(events_dictlist, 100):
165-
EventModel.insert_many(chunk).execute()
135+
# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
136+
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
137+
# See: https://github.com/coleifer/peewee/issues/948
138+
for chunk in chunks(events_dictlist, 100):
139+
EventModel.insert_many(chunk).execute()
166140

167141
def _get_event(self, bucket_id, event_id) -> EventModel:
168142
return EventModel.select() \

0 commit comments

Comments
 (0)