diff --git a/README.md b/README.md
index f6a278eb..e3415b4b 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# PyLander
-
-
+
+
👋 Welcome to Open Source Calendar built with Python. 🐍
@@ -39,5 +39,11 @@ cp app/config.py.example app/configuration.py
# Edit the variables' values.
uvicorn app.main:app --reload
```
+### Running tests
+```shell
+python -m pytest --cov-report term-missing --cov=app tests
+```
+
## Contributing
View [contributing guidelines](https://github.com/PythonFreeCourse/calendar/blob/master/CONTRIBUTING.md).
+
diff --git a/app/routers/event.py b/app/routers/event.py
index 0240066c..fd460583 100644
--- a/app/routers/event.py
+++ b/app/routers/event.py
@@ -3,16 +3,16 @@
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Request
+from loguru import logger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from starlette import status
from starlette.responses import RedirectResponse
-from starlette.status import HTTP_302_FOUND
from app.database.database import get_db
from app.database.models import Event, User, UserEvent
-from app.dependencies import logger, templates
+from app.dependencies import templates
from app.internal.event import validate_zoom_link
from app.internal.utils import create_model
from app.routers.user import create_user
@@ -53,18 +53,13 @@ async def create_new_event(request: Request, session=Depends(get_db)):
location)
return RedirectResponse(router.url_path_for('eventview',
event_id=event.id),
- status_code=HTTP_302_FOUND)
+ status_code=status.HTTP_302_FOUND)
@router.get("/{event_id}")
async def eventview(request: Request, event_id: int,
db: Session = Depends(get_db)):
- try:
- event = get_event_by_id(db, event_id)
- except NoResultFound:
- raise HTTPException(status_code=404, detail="Event not found")
- except MultipleResultsFound:
- raise HTTPException(status_code=500, detail="Multiple events found")
+ event = by_id(db, event_id)
start_format = '%A, %d/%m/%Y %H:%M'
end_format = ('%H:%M' if event.start.date() == event.end.date()
else start_format)
@@ -74,102 +69,116 @@ async def eventview(request: Request, event_id: int,
"end_format": end_format})
-@router.delete("/{event_id}")
-def delete_event(request: Request, event_id: int,
- db: Session = Depends(get_db)):
- # TODO: Check if the user is the owner of the event.
- try:
- event = get_event_by_id(db, event_id)
- except NoResultFound:
- raise HTTPException(status_code=404, detail="Event not found")
- except MultipleResultsFound:
- raise HTTPException(status_code=500, detail="Multiple events found")
-
- participants = get_participants_emails_by_event(db, event_id)
-
- try:
- db.delete(event)
- db.query(UserEvent).filter_by(event_id=event_id).delete()
- db.commit()
- except (SQLAlchemyError, TypeError):
- return templates.TemplateResponse(
- "event/eventview.html", {"request": request, "event_id": event_id},
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
-
- if participants and event.start > datetime.now():
- pass
- # TODO: Send them a cancellation notice
- # if the deletion is successful
- return RedirectResponse(
- url="/calendar", status_code=status.HTTP_200_OK)
+UPDATE_EVENTS_FIELDS = {
+ 'title': str,
+ 'start': datetime,
+ 'end': datetime,
+ 'content': (str, type(None)),
+ 'location': (str, type(None))
+}
-def get_event_by_id(db: Session, event_id: int) -> Event:
- """Gets a single event by id"""
+def by_id(db: Session, event_id: int) -> Event:
+ """Get a single event by id"""
if not isinstance(db, Session):
- raise AttributeError(
+ error_message = (
f'Could not connect to database. '
f'db instance type received: {type(db)}')
+ logger.critical(error_message)
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=error_message)
+
try:
event = db.query(Event).filter_by(id=event_id).one()
except NoResultFound:
- raise NoResultFound(f"Event ID does not exist. ID: {event_id}")
+ error_message = f"Event ID does not exist. ID: {event_id}"
+ logger.exception(error_message)
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=error_message)
except MultipleResultsFound:
error_message = (
f'Multiple results found when getting event. Expected only one. '
f'ID: {event_id}')
logger.critical(error_message)
- raise MultipleResultsFound(error_message)
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=error_message)
return event
-def is_date_before(start_date: datetime, end_date: datetime) -> bool:
+def is_end_date_before_start_date(
+ start_date: datetime, end_date: datetime) -> bool:
"""Check if the start date is earlier than the end date"""
- return start_date < end_date
-
+ return start_date > end_date
-def is_it_possible_to_change_dates(old_event: Event,
- event: Dict[str, Any]) -> bool:
- return is_date_before(
- event.get('start', old_event.start),
- event.get('end', old_event.end))
-
-
-def get_items_that_can_be_updated(event: Dict[str, Any]) -> Dict[str, Any]:
- """Extract only that keys to update"""
-
- return {i: event[i] for i in (
- 'title', 'start', 'end', 'content', 'location') if i in event}
-
-
-def update_event(event_id: int, event: Dict, db: Session
- ) -> Optional[Event]:
- # TODO Check if the user is the owner of the event.
- event_to_update = get_items_that_can_be_updated(event)
- if not event_to_update:
- return None
+def check_change_dates_allowed(
+ old_event: Event, event: Dict[str, Any]):
+ allowed = 1
try:
- old_event = get_event_by_id(db, event_id)
- except NoResultFound:
- raise HTTPException(status_code=404, detail="Event not found")
- except MultipleResultsFound:
- raise HTTPException(status_code=500, detail="Multiple events found")
+ start_date = event.get('start', old_event.start)
+ end_date = event.get('end', old_event.end)
+ if is_end_date_before_start_date(start_date, end_date):
+ allowed = 0
+ except TypeError:
+ allowed = 0
+ if allowed == 0:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Invalid times")
+
+
+def is_fields_types_valid(to_check: Dict[str, Any], types: Dict[str, Any]):
+ """validate dictionary values by dictionary of types"""
+ errors = []
+ for field_name, field_type in to_check.items():
+ if types[field_name] and not isinstance(field_type, types[field_name]):
+ errors.append(
+ f"{field_name} is '{type(field_type).__name__}' and"
+ + f"it should be from type '{types[field_name].__name__}'")
+ logger.warning(errors)
+ if errors:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST, detail=errors)
+
+
+def get_event_with_editable_fields_only(event: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Remove all keys that are not allowed to update"""
+
+ return {i: event[i] for i in UPDATE_EVENTS_FIELDS if i in event}
+
+
+def _update_event(db: Session, event_id: int, event_to_update: Dict) -> Event:
try:
- if not is_it_possible_to_change_dates(old_event, event_to_update):
- return None
-
# Update database
db.query(Event).filter(Event.id == event_id).update(
event_to_update, synchronize_session=False)
+
db.commit()
+ return by_id(db, event_id)
+ except (AttributeError, SQLAlchemyError) as e:
+ logger.exception(str(e))
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Internal server error")
- # TODO: Send emails to recipients.
- except (AttributeError, SQLAlchemyError, TypeError):
- return None
- return get_event_by_id(db=db, event_id=event_id)
+def update_event(event_id: int, event: Dict, db: Session
+ ) -> Optional[Event]:
+ # TODO Check if the user is the owner of the event.
+ old_event = by_id(db, event_id)
+ event_to_update = get_event_with_editable_fields_only(event)
+ is_fields_types_valid(event_to_update, UPDATE_EVENTS_FIELDS)
+ check_change_dates_allowed(old_event, event_to_update)
+ if not event_to_update:
+ return None
+ event_updated = _update_event(db, event_id, event_to_update)
+ # TODO: Send emails to recipients.
+ return event_updated
def create_event(db, title, start, end, owner_id, content=None, location=None):
@@ -203,10 +212,42 @@ def get_participants_emails_by_event(db: Session, event_id: int) -> List[str]:
"""Returns a list of all the email address of the event invited users,
by event id."""
- return (
- [email[0] for email in db.query(User.email).
+ return [email[0] for email in db.query(User.email).
select_from(Event).
join(UserEvent, UserEvent.event_id == Event.id).
join(User, User.id == UserEvent.user_id).
filter(Event.id == event_id).
- all()])
+ all()]
+
+
+def _delete_event(db: Session, event: Event):
+ try:
+ # Delete event
+ db.delete(event)
+
+ # Delete user_event
+ db.query(UserEvent).filter(UserEvent.event_id == event.id).delete()
+
+ db.commit()
+
+ except (SQLAlchemyError, AttributeError) as e:
+ logger.exception(str(e))
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Deletion failed")
+
+
+@router.delete("/{event_id}")
+def delete_event(event_id: int,
+ db: Session = Depends(get_db)):
+
+ # TODO: Check if the user is the owner of the event.
+ event = by_id(db, event_id)
+ participants = get_participants_emails_by_event(db, event_id)
+ _delete_event(db, event)
+ if participants and event.start > datetime.now():
+ pass
+ # TODO: Send them a cancellation notice
+ # if the deletion is successful
+ return RedirectResponse(
+ url="/calendar", status_code=status.HTTP_200_OK)
diff --git a/requirements.txt b/requirements.txt
index 10ce513a..668d2a03 100644
Binary files a/requirements.txt and b/requirements.txt differ
diff --git a/tests/test_event.py b/tests/test_event.py
index c47c2419..aea49255 100644
--- a/tests/test_event.py
+++ b/tests/test_event.py
@@ -1,13 +1,13 @@
from datetime import datetime
-from fastapi import HTTPException
import pytest
-from sqlalchemy.orm.exc import NoResultFound
+from fastapi import HTTPException
from starlette import status
-from starlette.status import HTTP_302_FOUND
from app.database.models import Event
-from app.routers.event import get_event_by_id, update_event
+from app.routers.event import (_delete_event, by_id, delete_event,
+ check_change_dates_allowed, update_event,
+ _update_event)
CORRECT_EVENT_FORM_DATA = {
'title': 'test title',
@@ -37,8 +37,12 @@
'privacy': 'public'
}
-INVALID_UPDATE_OPTIONS = [
- {}, {"test": "test"}, {"start": "20.01.2020"},
+NONE_UPDATE_OPTIONS = [
+ {}, {"test": "test"},
+]
+
+INVALID_FIELD_UPDATE = [
+ {"start": "20.01.2020"},
{"start": datetime(2020, 2, 2), "end": datetime(2020, 1, 1)},
{"start": datetime(2030, 2, 2)}, {"end": datetime(1990, 1, 1)},
]
@@ -66,7 +70,7 @@ def test_eventedit_post_correct(client, user):
response = client.post(client.app.url_path_for('create_new_event'),
data=CORRECT_EVENT_FORM_DATA)
assert response.ok
- assert response.status_code == HTTP_302_FOUND
+ assert response.status_code == status.HTTP_302_FOUND
assert (client.app.url_path_for('eventview', event_id=1).strip('1')
in response.headers['location'])
@@ -77,12 +81,28 @@ def test_eventedit_post_wrong(client, user):
assert response.json()['detail'] == 'VC type with no valid zoom link'
-@pytest.mark.parametrize("data", INVALID_UPDATE_OPTIONS)
+@pytest.mark.parametrize("data", NONE_UPDATE_OPTIONS)
def test_invalid_update(event, data, session):
assert update_event(event_id=event.id,
event=data, db=session) is None
+@pytest.mark.parametrize("data", INVALID_FIELD_UPDATE)
+def test_invalid_fields(event, data, session):
+ with pytest.raises(HTTPException):
+ response = update_event(event_id=event.id, event=data, db=session)
+ assert response.status_code == status.HTTP_400_BAD_REQUEST
+
+
+def test_not_check_change_dates_allowed(event):
+ data = {"start": "20.01.2020"}
+ with pytest.raises(HTTPException):
+ assert (
+ check_change_dates_allowed(event, data).status_code ==
+ status.HTTP_400_BAD_REQUEST
+ )
+
+
def test_successful_update(event, session):
data = {
"title": "successful",
@@ -95,9 +115,12 @@ def test_successful_update(event, session):
def test_update_db_close(event):
- data = {"title": "Problem connecting to db", }
- with pytest.raises(AttributeError):
- update_event(event_id=event.id, event=data, db=None)
+ data = {"title": "Problem connecting to db in func update_event", }
+ with pytest.raises(HTTPException):
+ assert (
+ update_event(event_id=event.id, event=data, db=None).status_code ==
+ status.HTTP_500_INTERNAL_SERVER_ERROR
+ )
def test_update_event_does_not_exist(event, session):
@@ -105,38 +128,52 @@ def test_update_event_does_not_exist(event, session):
"content": "An update test for an event does not exist"
}
with pytest.raises(HTTPException):
- response = update_event(event_id=500, event=data, db=session)
+ response = update_event(event_id=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ event=data, db=session)
assert response.status_code == status.HTTP_404_NOT_FOUND
+def test_db_close_update(session, event):
+ data = {"title": "Problem connecting to db in func _update_event", }
+ with pytest.raises(HTTPException):
+ assert (
+ _update_event(
+ event_id=event.id,
+ event_to_update=data,
+ db=None).status_code ==
+ status.HTTP_500_INTERNAL_SERVER_ERROR
+ )
+
+
def test_repr(event):
assert event.__repr__() == f''
+def test_no_connection_to_db_in_delete(event):
+ with pytest.raises(HTTPException):
+ response = delete_event(event_id=1, db=None)
+ assert (
+ response.status_code ==
+ status.HTTP_500_INTERNAL_SERVER_ERROR
+ )
+
+
+def test_no_connection_to_db_in_internal_deletion(event):
+ with pytest.raises(HTTPException):
+ assert (
+ _delete_event(event=event, db=None).status_code ==
+ status.HTTP_500_INTERNAL_SERVER_ERROR
+ )
+
+
def test_successful_deletion(event_test_client, session, event):
response = event_test_client.delete("/event/1")
assert response.ok
- with pytest.raises(NoResultFound):
- get_event_by_id(db=session, event_id=1)
+ with pytest.raises(HTTPException):
+ assert "Event ID does not exist. ID: 1" in by_id(
+ db=session, event_id=1).content
-def test_delete_failed(event_test_client, event):
+def test_deleting_an_event_does_not_exist(event_test_client, event):
response = event_test_client.delete("/event/2")
assert response.status_code == status.HTTP_404_NOT_FOUND
-
-
-def test_get_event_by_valid_id(session, event):
- event_id = event.id
- result = get_event_by_id(db=session, event_id=event_id)
- expected_type = Event
- assert type(result) == expected_type, \
- f'get_event_by_id returned unexpected type. ' \
- f'Expected: {expected_type}, Actual: {type(result)}'
- assert result.id == event_id, 'get_event_by_id returned the wrong event'
-
-
-def test_get_event_by_unexisting_id(session):
- event_id = 2
- with pytest.raises(NoResultFound) as excinfo:
- get_event_by_id(db=session, event_id=event_id)
- assert 'Event ID does not exist.' in str(excinfo)