diff --git a/testbed/core/forms/oauth_connection_form.py b/testbed/core/forms/oauth_connection_form.py index 3e6cd01..62a3de4 100644 --- a/testbed/core/forms/oauth_connection_form.py +++ b/testbed/core/forms/oauth_connection_form.py @@ -6,11 +6,16 @@ class OAuthApplicationForm(forms.ModelForm): class Meta: model = Application - fields = ['client_id', 'client_secret', 'redirect_uris'] + fields = ['name', 'client_id', 'client_secret', 'redirect_uris'] labels = { + 'name': 'Service Name', 'redirect_uris': 'Redirect URL' } widgets = { + 'name': forms.TextInput(attrs={ + 'class': 'form-control', + 'placeholder': 'My ActivityPub Service' + }), 'client_id': forms.TextInput(attrs={'class': 'form-control', 'placeholder': 'Client ID'}), 'client_secret': forms.TextInput(attrs={'class': 'form-control', 'placeholder': 'Client Secret'}), 'redirect_uris': forms.TextInput(attrs={ @@ -38,7 +43,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Set default values for required fields that users don't need to see for now if not self.instance.pk: - self.instance.name = kwargs.get('initial', {}).get('name', 'OAuth App') + self.instance.client_type = 'confidential' self.instance.authorization_grant_type = 'authorization-code' diff --git a/testbed/core/tests/test_oauth_validators.py b/testbed/core/tests/test_oauth_validators.py new file mode 100644 index 0000000..5e2b206 --- /dev/null +++ b/testbed/core/tests/test_oauth_validators.py @@ -0,0 +1,105 @@ +import pytest +from unittest.mock import MagicMock, patch + +from django.contrib.auth import get_user_model +from oauth2_provider.models import get_application_model +from testbed.core.utils.oauth_validators import ActivityPubOAuth2Validator + +User = get_user_model() +Application = get_application_model() + + +# The validator must ensure that clients request the appropriate scopes and use registered redirect URI + +# Creates an instance of custom validator +@pytest.fixture +def oauth_validator(): + return ActivityPubOAuth2Validator() + +# Represents a client service registered with the testbed +@pytest.fixture +def oauth_application(user): + return Application.objects.create( + name='Test ActivityPub Service', + user=user, + client_type='confidential', + authorization_grant_type='authorization-code', + client_id='test-client-id', + client_secret='test-client-secret', + redirect_uris='https://example.com/callback' + ) + +# Simulates the client making the request +@pytest.fixture +def oauth_client(): + client = MagicMock() + client.client_id = 'test-client-id' + return client + +# Simulates the HTTP request in the OAuth flow +@pytest.fixture +def mock_request(): + return MagicMock() + +# Test that validator accepts the activitypub_account_portability scope +# The destination service must request this specific scope to indicate it wants to perform account portability operations +@pytest.mark.django_db +def test_validate_scopes_with_valid_scope(oauth_validator, oauth_application, oauth_client, mock_request): + scopes = ['activitypub_account_portability'] + result = oauth_validator.validate_scopes( + oauth_application.client_id, + scopes, + oauth_client, + mock_request + ) + assert result, "The validator should accept the activitypub_account_portability scope" + +# Test that validator rejects empty scopes +@pytest.mark.django_db +def test_validate_scopes_with_no_scopes(oauth_validator, oauth_application, oauth_client, mock_request): + scopes = [] + result = oauth_validator.validate_scopes( + oauth_application.client_id, + scopes, + oauth_client, + mock_request + ) + assert not result, "The validator should reject empty scopes" + +# Test that validator rejects scopes without activitypub_account_portability +# This prevents services from using our OAuth endpoints for purposes other than account portability +@pytest.mark.django_db +def test_validate_scopes_with_invalid_scopes(oauth_validator, oauth_application, oauth_client, mock_request): + scopes = ['read', 'write'] + result = oauth_validator.validate_scopes( + oauth_application.client_id, + scopes, + oauth_client, + mock_request + ) + assert not result, "The validator should reject scopes without activitypub_account_portability" + +# Test that validator accepts a valid redirect URI +@pytest.mark.django_db +def test_validate_redirect_uri_with_valid_uri(oauth_validator, oauth_application, mock_request): + with patch.object(oauth_validator.__class__.__bases__[0], 'validate_redirect_uri', return_value=True): + result = oauth_validator.validate_redirect_uri( + oauth_application.client_id, + 'https://example.com/callback', + mock_request + ) + assert result, "The validator should accept a valid redirect URI" + +# Test that validator rejects an invalid redirect URI +# When a user authorizes a destination service, the authorization code must only be sent to +# the destination's registered redirect URL to prevent malicious services from intercepting the flow. +@pytest.mark.django_db +def test_validate_redirect_uri_with_invalid_uri(oauth_validator, oauth_application, mock_request): + + with patch.object(oauth_validator.__class__.__bases__[0], 'validate_redirect_uri', return_value=False): + result = oauth_validator.validate_redirect_uri( + oauth_application.client_id, + 'https://malicious-site.com/callback', + mock_request + ) + assert not result, "The validator should reject an invalid redirect URI" diff --git a/testbed/core/utils/oauth_utils.py b/testbed/core/utils/oauth_utils.py index ddfeed4..85f4db7 100644 --- a/testbed/core/utils/oauth_utils.py +++ b/testbed/core/utils/oauth_utils.py @@ -6,8 +6,8 @@ Application = get_application_model() # Get or create the single OAuth Application for a user. -# This enforces the one-application-per-user approach. -# Args: user: The Django User object +# This enforces the one-application-per-user approach where each user +# represents an ActivityPub service in the LOLA portability flow. def get_user_application(user): # Check if user already has an application @@ -19,12 +19,14 @@ def get_user_application(user): if applications.count() > 1: # Log warning if multiple exist (shouldn't happen with our business logic) logger.warning(f"User {user.username} has multiple OAuth applications. Using the first one.") + logger.info(f"Retrieved existing OAuth application for user {user.username}") return application - # Create a new application with random credentials + # Create a new application with random credentials and ActivityPub-specific name + logger.info(f"Creating new ActivityPub OAuth application for user {user.username}") return Application.objects.create( user=user, - name=f"{user.username}'s OAuth App", + name=f"{user.username}'s ActivityPub Service", client_id=random_client_id(), client_secret=random_client_secret(), redirect_uris='', diff --git a/testbed/core/utils/oauth_validators.py b/testbed/core/utils/oauth_validators.py new file mode 100644 index 0000000..9c4b9e4 --- /dev/null +++ b/testbed/core/utils/oauth_validators.py @@ -0,0 +1,40 @@ +import logging +from oauth2_provider.oauth2_validators import OAuth2Validator + +logger = logging.getLogger(__name__) + +# Custom validator for ActivityPub-specific OAuth requirements +class ActivityPubOAuth2Validator(OAuth2Validator): + + # Ensure the client is requesting valid scopes for ActivityPub portability + def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs): + if not scopes: + logger.warning(f"Client {client_id} requested OAuth with no scopes") + return False + + # For account portability, it requires the 'activitypub_account_portability' scope + if 'activitypub_account_portability' not in scopes: + logger.warning( + f"Client {client_id} requested OAuth without 'activitypub_account_portability' scope. " + f"Scopes: {scopes}" + ) + return False + + logger.info(f"Client {client_id} requested valid scopes: {scopes}") + return super().validate_scopes(client_id, scopes, client, request, *args, **kwargs) + + # Additional validation for redirect URIs in ActivityPub context + def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs): + + # Standard validation first + valid = super().validate_redirect_uri(client_id, redirect_uri, request, *args, **kwargs) + + if not valid: + logger.warning(f"Client {client_id} requested invalid redirect URI: {redirect_uri}") + return False + + # We could add additional validation here if needed later on + # For example, checking for HTTPS in production + + logger.info(f"Client {client_id} requested valid redirect URI: {redirect_uri}") + return True diff --git a/testbed/settings/base.py b/testbed/settings/base.py index 2c80e1d..d8252ca 100644 --- a/testbed/settings/base.py +++ b/testbed/settings/base.py @@ -227,6 +227,7 @@ 'ACCESS_TOKEN_EXPIRE_SECONDS': 3600, # 1 hour 'REFRESH_TOKEN_EXPIRE_SECONDS': 86400, # 1 day 'AUTHORIZATION_CODE_EXPIRE_SECONDS': 600, # 10 minutes + 'OAUTH2_VALIDATOR_CLASS': 'testbed.core.utils.oauth_validators.ActivityPubOAuth2Validator', } # Configure REST framework to use OAuth2 authentication