Skip to content
142 changes: 142 additions & 0 deletions common/management/manage_test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from django.contrib.auth.models import User

from apps.app_registry import APP_REGISTRY
from apps.helpers import create_instance_from_form
from common.models import UserProfile
from projects.models import Environment, Flavor, Project, ProjectTemplate
from projects.tasks import create_resources_from_template


class TestDataManager:
def __init__(self, user_data=None, project_data=None, app_data=None):
self.user_data = user_data
self.project_data = project_data
self.app_data = app_data

def create_user(self):
"""Create a regular user with optional profile information."""
if not all(key in self.user_data for key in ("username", "email", "password")):
raise ValueError("Missing required user fields")

user = User.objects.create_user(
username=self.user_data["username"], email=self.user_data["email"], password=self.user_data["password"]
)

# Optional fields
if all(field in self.user_data for field in ("first_name", "last_name")):
user.first_name = self.user_data["first_name"]
user.last_name = self.user_data["last_name"]

user.is_active = True
user.save()

# Profile creation
if all(field in self.user_data for field in ("department", "affiliation")):
user_profile = UserProfile.objects.create_user_profile(user)
user_profile.department = self.user_data["department"]
user_profile.affiliation = self.user_data["affiliation"]
user_profile.save()

return user

def create_superuser(self):
"""Create a superuser with admin privileges."""
if not all(key in self.user_data for key in ("username", "email", "password")):
raise ValueError("Missing required user fields")

user = User.objects.create_superuser(
username=self.user_data["username"], email=self.user_data["email"], password=self.user_data["password"]
)
user.is_active = True
user.save()
return user

def delete_user(self):
"""Delete user and associated profile. Returns deletion count."""
if "email" not in self.user_data:
raise ValueError("Missing email for user deletion")

user_to_delete = User.objects.filter(email__exact=self.user_data["email"])

if all(field in self.user_data for field in ("department", "affiliation")):
UserProfile.objects.filter(user__in=user_to_delete).delete()

deleted_count, _ = user_to_delete.delete()
return deleted_count

def create_project(self):
"""Create a project with associated resources."""
if not all(key in self.project_data for key in ("project_name", "project_description")):
raise ValueError("Missing required project fields")
if "email" not in self.user_data:
raise ValueError("Missing email for user selection")

user = User.objects.get(email__exact=self.user_data["email"])
project_template = ProjectTemplate.objects.get(pk=1)

project = Project.objects.create_project(
name=self.project_data["project_name"],
owner=user,
description=self.project_data["project_description"],
project_template=project_template,
status="created",
)
project.save()

create_resources_from_template(user.username, project.slug, project_template.template)

return project

def delete_project(self):
"""Delete specific project. Returns deletion count."""
if "project_name" not in self.project_data:
raise ValueError("Missing project name for deletion")
if "email" not in self.user_data:
raise ValueError("Missing email for user selection")

user = User.objects.get(email__exact=self.user_data["email"])
project_to_delete = Project.objects.filter(owner=user, name=self.project_data["project_name"])
deleted_count, _ = project_to_delete.delete()
return deleted_count

def delete_all_projects(self):
"""Delete all user's projects. Returns deletion count."""
if "email" not in self.user_data:
raise ValueError("Missing email for user selection")
user = User.objects.get(email__exact=self.user_data["email"])
projects_to_delete = Project.objects.filter(owner=user)
deleted_count, _ = projects_to_delete.delete()
return deleted_count

def create_app(self):
"""Create an application instance with validation."""
if "project_name" not in self.project_data:
raise ValueError("Missing project name for deletion")
if "email" not in self.user_data:
raise ValueError("Missing email for user selection")
user = User.objects.get(email__exact=self.user_data["email"])
project = Project.objects.filter(owner=user, name=self.project_data["project_name"]).first()
flavor = Flavor.objects.filter(project=project).first()
environment = Environment.objects.filter(project=project).first()
app_slug = self.app_data["app_slug"]
del self.app_data["app_slug"]

self.app_data["flavor"] = str(flavor.pk)
self.app_data["environment"] = str(environment.pk)

# Check if the model form tuple exists
if app_slug not in APP_REGISTRY:
raise ValueError(f"Form class not found for app slug {app_slug}")

form_class = APP_REGISTRY.get_form_class(app_slug)

# Create form
form = form_class(self.app_data, project_pk=project.pk)

if form.is_valid():
# now create app
create_instance_from_form(form, project, app_slug)
else:
raise ValueError(f"Form is invalid: {form.errors.as_data()}")

return True
195 changes: 195 additions & 0 deletions common/tests_security_test_data_population_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import json

import pytest
from django.conf import settings
from django.http import HttpResponseForbidden
from django.middleware.csrf import get_token
from django.test import RequestFactory

from common.views import (
CleanupAllTestProjectsView,
CleanupTestProjectView,
CleanupTestUserView,
PopulateTestAppView,
PopulateTestProjectView,
PopulateTestSuperUserView,
PopulateTestUserView,
)

user_data = {
"affiliation": "uu",
"department": "user-department-name",
"email": "[email protected]",
"first_name": "user-first-name",
"last_name": "user-last-name",
"password": "tesT12345@",
"username": "unit_test_user",
}

superuser_data = {
"email": "[email protected]",
"password": "tesT12345@",
"username": "unit_test_super_user",
}

project_data = {"project_name": "e2e-collections-test-proj", "project_description": "e2e-collections-test-proj-desc"}

app_data = {
"app_slug": "dashapp",
"name": "collection-app-name",
"description": "collection-app-description",
"access": "public",
"port": 8000,
"image": "ghcr.io/scilifelabdatacentre/example-dash:latest",
"source_code_url": "https://someurlthatdoesnotexist.com",
}
# Test configuration
SECURITY_CONFIGS = [
(
PopulateTestUserView,
"/devtools/populate-test-user/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
(
PopulateTestSuperUserView,
"/devtools/populate-test-superuser/",
{"user_data": superuser_data, "project_data": project_data, "app_data": app_data},
),
(
PopulateTestProjectView,
"/devtools/populate-test-project/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
(
PopulateTestAppView,
"/devtools/populate-test-app/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
(
CleanupTestProjectView,
"/devtools/cleanup-test-project/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
(
CleanupAllTestProjectsView,
"/devtools/cleanup-all-test-projects/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
(
CleanupTestUserView,
"/devtools/cleanup-test-user/",
{"user_data": user_data, "project_data": project_data, "app_data": app_data},
),
]
USER_CONFIGS = [
(PopulateTestUserView, "/devtools/populate-test-user/", {"user_data": user_data}),
(PopulateTestSuperUserView, "/devtools/populate-test-superuser/", {"user_data": superuser_data}),
(CleanupTestUserView, "/devtools/cleanup-test-user/", {"user_data": user_data}),
(CleanupTestUserView, "/devtools/cleanup-test-user/", {"user_data": superuser_data}),
]


@pytest.fixture
def factory():
return RequestFactory()


@pytest.fixture
def valid_secret():
return settings.POPULATE_TEST_DATA_MANAGEMENT_VIEWS_SECRET


@pytest.fixture
def csrf_token(factory):
"""Generate a valid CSRF token"""
request = factory.get("/dummy-url/")
return get_token(request)


# Security check tests
@pytest.mark.parametrize("view_class,endpoint,payload", SECURITY_CONFIGS)
def test_development_environment_check(view_class, endpoint, payload, factory, valid_secret, csrf_token):
settings.DEBUG = False
request = factory.post(
endpoint,
json.dumps(payload),
content_type="application/json",
headers={"X-Envoy-Secret": valid_secret, "X-CSRFToken": csrf_token},
)
request.COOKIES["csrftoken"] = csrf_token
response = view_class.as_view()(request)
assert isinstance(response, HttpResponseForbidden)
assert "Test functionality disabled in production" in str(response.content)


@pytest.mark.parametrize("view_class,endpoint,payload", SECURITY_CONFIGS)
def test_invalid_secret_check(view_class, endpoint, payload, factory, csrf_token):
# Test invalid secret
settings.DEBUG = True
request = factory.post(
endpoint,
json.dumps(payload),
content_type="application/json",
headers={"X-Envoy-Secret": "invalid-secret", "X-CSRFToken": csrf_token},
)
request.COOKIES["csrftoken"] = csrf_token
response = view_class.as_view()(request)
assert isinstance(response, HttpResponseForbidden)
assert "Authorization failed" in str(response.content)


# Invalid JSON test
@pytest.mark.parametrize("view_class,endpoint,_", SECURITY_CONFIGS)
def test_invalid_json_check(view_class, endpoint, _, factory, valid_secret, csrf_token):
settings.DEBUG = True
request = factory.post(
endpoint,
"invalid{json",
content_type="application/json",
headers={"X-Envoy-Secret": valid_secret, "X-CSRFToken": csrf_token},
)
request.COOKIES["csrftoken"] = csrf_token
response = view_class.as_view()(request)
assert response.status_code == 400
assert json.loads(response.content)["error"] == "Invalid JSON format"


@pytest.mark.parametrize("view_class,endpoint,payload", SECURITY_CONFIGS)
def test_csrf_protection_check(view_class, endpoint, payload, factory, valid_secret, csrf_token):
settings.DEBUG = True

# Test missing CSRF token
request = factory.post(
endpoint, json.dumps(payload), content_type="application/json", headers={"X-Envoy-Secret": valid_secret}
)
response = view_class.as_view()(request)
assert response.status_code == 403
assert "CSRF" in str(response.content)

# Test invalid CSRF token
request = factory.post(
endpoint,
json.dumps(payload),
content_type="application/json",
headers={"X-Envoy-Secret": valid_secret, "X-CSRFToken": "invalid-token"},
)
request.COOKIES["csrftoken"] = csrf_token # Cookie/header mismatch
response = view_class.as_view()(request)
assert response.status_code == 403
assert "CSRF" in str(response.content)


@pytest.mark.django_db
@pytest.mark.parametrize("view_class,endpoint,payload", USER_CONFIGS)
def test_valid_user(view_class, endpoint, payload, factory, valid_secret, csrf_token):
settings.DEBUG = True

request = factory.post(
endpoint,
json.dumps(payload),
content_type="application/json",
headers={"X-Envoy-Secret": valid_secret, "X-CSRFToken": csrf_token},
)
request.COOKIES["csrftoken"] = csrf_token
response = view_class.as_view()(request)
assert response.status_code == 200
22 changes: 22 additions & 0 deletions common/urls.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from django.conf import settings
from django.contrib.auth import views as auth_views
from django.urls import include, path

from . import views

# from .views import PopulateTestDataView, CleanupTestDataView

app_name = "common"

urlpatterns = [
Expand All @@ -13,3 +16,22 @@
path("password-change/", views.ChangePasswordView.as_view(), name="password-change"),
path("admin_profile_edit_disabled/", views.EditProfileView.as_view(), name="admin_profile_edit_disabled"),
]

if settings.DEBUG:
urlpatterns += [
path("devtools/populate-test-user/", views.PopulateTestUserView.as_view(), name="populate-test-user"),
path(
"devtools/populate-test-superuser/",
views.PopulateTestSuperUserView.as_view(),
name="populate-test-superuser",
),
path("devtools/cleanup-test-user/", views.CleanupTestUserView.as_view(), name="cleanup-test-user"),
path("devtools/populate-test-project/", views.PopulateTestProjectView.as_view(), name="populate-test-project"),
path("devtools/cleanup-test-project/", views.CleanupTestProjectView.as_view(), name="cleanup-test-project"),
path(
"devtools/cleanup-all-test-projects/",
views.CleanupAllTestProjectsView.as_view(),
name="cleanup-all-test-projects",
),
path("devtools/populate-test-app/", views.PopulateTestAppView.as_view(), name="populate-test-app"),
]
Loading