Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0eab2d0
Experimental sqlite support
johan-bjareholt May 3, 2018
2f96b04
sqlite optimizations such as lazy-commit
johan-bjareholt May 10, 2018
71995f2
datastore: Added replace_last benchmark
johan-bjareholt May 10, 2018
15d4c7f
datastore: misc cleanups in storages
johan-bjareholt May 10, 2018
e2960e7
datastore: fixed broken bucketinfo and replace_last in sqlite
johan-bjareholt May 11, 2018
82256c5
datastore: Fixed replace_last in sqlite not working
johan-bjareholt May 11, 2018
fa8beb1
datastore: replaced duration with endtime in sqlite
johan-bjareholt May 12, 2018
963bca6
datastore: peewee->sqlite migration and sqlite optimizations
johan-bjareholt May 13, 2018
c74f957
Fixed types
johan-bjareholt May 13, 2018
c9cd5de
sqlite storage: made max timestamp value signed 8-bytes
johan-bjareholt May 14, 2018
b15ada2
Added coverage_html target in Makefile
johan-bjareholt May 25, 2018
ef3dd0b
Merge remote-tracking branch 'origin/master' into dev/sqlite
johan-bjareholt May 25, 2018
9b6469e
fixed limit_events test
johan-bjareholt May 25, 2018
ef11c36
datastore: Removed WAL from sqlite
johan-bjareholt May 26, 2018
d73bd82
Merge remote-tracking branch 'origin/master' into dev/sqlite
johan-bjareholt Jun 17, 2018
b8479da
Merge remote-tracking branch 'origin/master' into dev/sqlite
johan-bjareholt Jun 30, 2018
ebd3a5e
sqlite: Added conditional_commit, WAL and new indexes
johan-bjareholt Jun 30, 2018
c319e29
Merge remote-tracking branch 'origin/master' into dev/sqlite
johan-bjareholt Sep 22, 2018
72096f4
datastore: events foreign key to buckets is now a integer instead of …
johan-bjareholt Sep 23, 2018
87a0594
sqlite: fixed index names
johan-bjareholt Oct 4, 2018
caa7597
datastore: Added datastr to bucket table in sqlite
johan-bjareholt Nov 2, 2018
3531010
Merge remote-tracking branch 'origin/master' into dev/sqlite
johan-bjareholt Jan 23, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ build:
test:
python3 -m pytest tests -v --cov=aw_core --cov=aw_datastore --cov=aw_transform --cov=aw_analysis

coverage_html: test
python3 -m coverage html -d coverage_html

benchmark:
python3 -m aw_datastore.benchmark

Expand Down
6 changes: 4 additions & 2 deletions aw_datastore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from typing import Dict, Callable, Any
import platform as _platform

from .migration import check_for_migration

from . import storages
from .datastore import Datastore


# The Callable[[Any], ...] here should be Callable[..., ...] but Python 3.5.2 doesn't
# like ellipsises. See here: https://github.com/python/typing/issues/259
def get_storage_methods() -> Dict[str, Callable[[Any], storages.AbstractStorage]]:
from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage
from .storages import MemoryStorage, MongoDBStorage, PeeweeStorage, SqliteStorage
methods = {
PeeweeStorage.sid: PeeweeStorage,
MemoryStorage.sid: MemoryStorage,
SqliteStorage.sid: SqliteStorage,
} # type: Dict[str, Callable[[Any], storages.AbstractStorage]]

# MongoDB is not supported on Windows or macOS
Expand Down
72 changes: 38 additions & 34 deletions aw_datastore/benchmark.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import sys
from typing import Callable
from datetime import datetime, timedelta, timezone
Expand All @@ -7,20 +8,30 @@

from takethetime import ttt

from . import get_storage_methods, Datastore
from .storages import AbstractStorage
from aw_datastore import get_storage_methods, Datastore
from aw_datastore.storages import AbstractStorage


def create_test_events(n):
now = datetime.now(timezone.utc)
now = datetime.now(timezone.utc) - timedelta(days=1000)

events = [None] * n
events = []
for i in range(n):
events[i] = Event(timestamp=now + i * timedelta(hours=1), data={"label": "asd"})
events.append(Event(timestamp=now + i * timedelta(seconds=1), data={"label": "asd"}))

return events


def create_tmpbucket(ds, num):
bucket_id = "benchmark_test_bucket_{}".format(str(num))
try:
ds.delete_bucket(bucket_id)
except KeyError:
pass
ds.create_bucket(bucket_id, "testingtype", "test-client", "testing-box")
return bucket_id


@contextmanager
def temporary_bucket(ds):
bucket_id = "test_bucket"
Expand All @@ -35,49 +46,42 @@ def temporary_bucket(ds):

def benchmark(storage: Callable[..., AbstractStorage]):
ds = Datastore(storage, testing=True)
num_events = 5 * 10**4

num_single_events = 50
num_replace_events = 50
num_bulk_events = 2 * 10**3
num_events = num_single_events + num_replace_events + num_bulk_events + 1
num_final_events = num_single_events + num_bulk_events + 1

events = create_test_events(num_events)
single_events = events[:num_single_events]
replace_events = events[num_single_events:num_single_events+num_replace_events]
bulk_events = events[num_single_events+num_replace_events:-1]

print(storage.__name__)

with temporary_bucket(ds) as bucket:
with ttt(" sum"):
with ttt(" insert {} events".format(num_events)):
bucket.insert(events)
with ttt(" single insert {} events".format(num_single_events)):
for event in single_events:
bucket.insert(event)

with ttt(" bulk insert {} events".format(num_bulk_events)):
bucket.insert(bulk_events)

with ttt(" replace last {}".format(num_replace_events)):
for e in replace_events:
bucket.replace_last(e)

with ttt(" insert 1 event"):
bucket.insert(events[-1])

with ttt(" get one"):
events_tmp = bucket.get(limit=1)
# print("Total number of events: {}".format(len(events)))

with ttt(" get all"):
events_tmp = bucket.get(limit=num_events)
print(len(events_tmp))
assert len(events_tmp) == num_events
for e1, e2 in zip(events, sorted(events_tmp, key=lambda e: e.timestamp)):
try:
# Can't do direct comparison since tz will differ in object type (but have identical meaning)
# TODO: Fix the above by overriding __eq__ on Event
assert e1.timestamp.second == e2.timestamp.second
assert e1.timestamp.microsecond == e2.timestamp.microsecond
except AssertionError as e:
print(e1)
print(e2)
raise e
# print("Total number of events: {}".format(len(events)))

def events_in_interval(n):
with ttt(" get {} events within time interval".format(n)):
events_tmp = bucket.get(limit=num_events,
starttime=events[0].timestamp,
endtime=events[n].timestamp)
assert len(events_tmp) == n - 1
# print("Events within time interval: {}".format(len(events)))

events_in_interval(int(num_events / 2))
events_in_interval(10)
events_tmp = bucket.get()
assert len(events_tmp) == num_final_events


if __name__ == "__main__":
Expand Down
6 changes: 5 additions & 1 deletion aw_datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
"""
# NOTE: Should we keep the timestamp checking?
# Get last event for timestamp check after insert
"""
last_event_list = self.get(1)
last_event = None
if len(last_event_list) > 0:
last_event = last_event_list[0]
"""

now = datetime.now(tz=timezone.utc)

Expand All @@ -99,7 +101,7 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
if events.timestamp + events.duration > now:
self.logger.warning("Event inserted into bucket {} reaches into the future. Current UTC time: {}. Event data: {}".format(self.bucket_id, str(now), str(events)))
inserted = self.ds.storage_strategy.insert_one(self.bucket_id, events)
assert inserted
#assert inserted
elif isinstance(events, list):
if len(events) > 0:
oldest_event = sorted(events, key=lambda k: k['timestamp'])[0]
Expand All @@ -113,11 +115,13 @@ def insert(self, events: Union[Event, List[Event]]) -> Optional[Event]:
raise TypeError

# Warn if timestamp is older than last event
"""
if last_event and oldest_event:
if oldest_event.timestamp < last_event.timestamp:
self.logger.warning("Inserting event that has a older timestamp than previous event!" +
"\nPrevious:" + str(last_event) +
"\nInserted:" + str(oldest_event))
"""

return inserted

Expand Down
50 changes: 50 additions & 0 deletions aw_datastore/migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Optional, List
import os
import re
import logging

from aw_core.dirs import get_data_dir
from .storages import AbstractStorage

logger = logging.getLogger(__name__)

def detect_db_files(data_dir: str, datastore_name: str = None, version=None) -> List[str]:
db_files = [filename for filename in os.listdir(data_dir)]
if datastore_name:
db_files = [filename for filename in db_files if filename.split(".")[0] == datastore_name]
if version:
db_files = [filename for filename in db_files if filename.split(".")[1] == "v{}".format(version)]
return db_files

def check_for_migration(datastore: AbstractStorage, datastore_name: str, version: int):
data_dir = get_data_dir("aw-server")

if datastore.sid == "sqlite":
peewee_type = "peewee-sqlite"
peewee_name = peewee_type + "-testing" if datastore.testing else ""
# Migrate from peewee v2
peewee_db_v2 = detect_db_files(data_dir, peewee_name, 2)
if len(peewee_db_v2) > 0:
peewee_v2_to_sqlite_v1(datastore)

def peewee_v2_to_sqlite_v1(datastore):
logger.info("Migrating database from peewee v2 to sqlite v1")
from .storages import PeeweeStorage
pw_db = PeeweeStorage(datastore.testing)
# Fetch buckets and events
buckets = pw_db.buckets()
# Insert buckets and events to new db
for bucket_id in buckets:
logger.info("Migrating bucket {}".format(bucket_id))
bucket = buckets[bucket_id]
datastore.create_bucket(
bucket["id"],
bucket["type"],
bucket["client"],
bucket["hostname"],
bucket["created"],
bucket["name"]
)
bucket_events = pw_db.get_events(bucket_id, -1)
datastore.insert_many(bucket_id, bucket_events)
logger.info("Migration of peewee v2 to sqlite v1 finished")
1 change: 1 addition & 0 deletions aw_datastore/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
from .memory import MemoryStorage
from .mongodb import MongoDBStorage
from .peewee import PeeweeStorage
from .sqlite import SqliteStorage
3 changes: 2 additions & 1 deletion aw_datastore/storages/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AbstractStorage(metaclass=ABCMeta):

@abstractmethod
def __init__(self, testing: bool) -> None:
self.testing = True
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -55,7 +56,7 @@ def delete(self, bucket_id: str, event_id: int) -> bool:
raise NotImplementedError

@abstractmethod
def replace(self, bucket_id: str, event_id: int, event: Event) -> Event:
def replace(self, bucket_id: str, event_id: int, event: Event) -> bool:
raise NotImplementedError

@abstractmethod
Expand Down
4 changes: 3 additions & 1 deletion aw_datastore/storages/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def get_events(self, bucket: str, limit: int,
e.append(event)
events = e
# Limit
if limit < 0:
if limit == 0:
return []
elif limit < 0:
limit = sys.maxsize
events = events[:limit]
# Return
Expand Down
4 changes: 2 additions & 2 deletions aw_datastore/storages/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def replace_last(self, bucket_id: str, event: Event):
last_event = list(self.db[bucket_id]["events"].find().sort([("timestamp", -1)]).limit(1))[0]
self.db[bucket_id]["events"].replace_one({"_id": last_event["_id"]}, self._transform_event(event.copy()))

def replace(self, bucket_id: str, event_id, event: Event) -> Event:
def replace(self, bucket_id: str, event_id, event: Event) -> bool:
self.db[bucket_id]["events"].replace_one({"_id": event_id}, self._transform_event(event.copy()))
event.id = event_id
return event
return True
36 changes: 6 additions & 30 deletions aw_datastore/storages/peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,19 @@ def json(self):
}


def detect_db_files(data_dir: str) -> List[str]:
return [filename for filename in os.listdir(data_dir) if "peewee-sqlite" in filename]


def detect_db_version(data_dir: str, max_version: Optional[int] = None) -> Optional[int]:
"""Returns the most recent version number of any database file found (up to max_version)"""
import re
files = detect_db_files(data_dir)
r = re.compile("v[0-9]+")
re_matches = [r.search(filename) for filename in files]
versions = [int(match.group(0)[1:]) for match in re_matches if match]
if max_version:
versions = [v for v in versions if v <= max_version]
return max(versions) if versions else None


class PeeweeStorage(AbstractStorage):
sid = "peewee"

def __init__(self, testing):
data_dir = get_data_dir("aw-server")
current_db_version = detect_db_version(data_dir, max_version=LATEST_VERSION)

if current_db_version is not None and current_db_version < LATEST_VERSION:
# DB file found but was of an older version
logger.info("Latest version database file found was of an older version")
logger.info("Creating database file for new version {}".format(LATEST_VERSION))
logger.warning("ActivityWatch does not currently support database migrations, new database file will be empty")

filename = 'peewee-sqlite' + ('-testing' if testing else '') + ".v{}".format(LATEST_VERSION) + '.db'
filepath = os.path.join(data_dir, filename)
self.db = _db
self.db.init(filepath)
logger.info("Using database file: {}".format(filepath))

# db.connect()
self.db.connect()

self.bucket_keys = {}
if not BucketModel.table_exists():
Expand Down Expand Up @@ -153,12 +130,11 @@ def insert_many(self, bucket_id, events: List[Event], fast=False) -> None:
"duration": event.duration.total_seconds(),
"datastr": json.dumps(event.data)}
for event in events]
with self.db.atomic():
# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
# See: https://github.com/coleifer/peewee/issues/948
for chunk in chunks(events_dictlist, 100):
EventModel.insert_many(chunk).execute()
# Chunking into lists of length 100 is needed here due to SQLITE_MAX_COMPOUND_SELECT
# and SQLITE_LIMIT_VARIABLE_NUMBER under Windows.
# See: https://github.com/coleifer/peewee/issues/948
for chunk in chunks(events_dictlist, 100):
EventModel.insert_many(chunk).execute()

def _get_event(self, bucket_id, event_id) -> EventModel:
return EventModel.select() \
Expand Down
Loading