Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions testbed/core/forms/oauth_connection_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
class OAuthApplicationForm(forms.ModelForm):
class Meta:
model = Application
fields = ['client_id', 'client_secret', 'redirect_uris']
fields = ['name', 'client_id', 'client_secret', 'redirect_uris']
labels = {
'name': 'Service Name',
'redirect_uris': 'Redirect URL'
}
widgets = {
'name': forms.TextInput(attrs={
'class': 'form-control',
'placeholder': 'My ActivityPub Service'
}),
'client_id': forms.TextInput(attrs={'class': 'form-control', 'placeholder': 'Client ID'}),
'client_secret': forms.TextInput(attrs={'class': 'form-control', 'placeholder': 'Client Secret'}),
'redirect_uris': forms.TextInput(attrs={
Expand Down Expand Up @@ -38,7 +43,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set default values for required fields that users don't need to see for now
if not self.instance.pk:
self.instance.name = kwargs.get('initial', {}).get('name', 'OAuth App')

self.instance.client_type = 'confidential'
self.instance.authorization_grant_type = 'authorization-code'

Expand Down
105 changes: 105 additions & 0 deletions testbed/core/tests/test_oauth_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import pytest
from unittest.mock import MagicMock, patch

from django.contrib.auth import get_user_model
from oauth2_provider.models import get_application_model
from testbed.core.utils.oauth_validators import ActivityPubOAuth2Validator

User = get_user_model()
Application = get_application_model()


# The validator must ensure that clients request the appropriate scopes and use registered redirect URI

# Creates an instance of custom validator
@pytest.fixture
def oauth_validator():
return ActivityPubOAuth2Validator()

# Represents a client service registered with the testbed
@pytest.fixture
def oauth_application(user):
return Application.objects.create(
name='Test ActivityPub Service',
user=user,
client_type='confidential',
authorization_grant_type='authorization-code',
client_id='test-client-id',
client_secret='test-client-secret',
redirect_uris='https://example.com/callback'
)

# Simulates the client making the request
@pytest.fixture
def oauth_client():
client = MagicMock()
client.client_id = 'test-client-id'
return client

# Simulates the HTTP request in the OAuth flow
@pytest.fixture
def mock_request():
return MagicMock()

# Test that validator accepts the activitypub_account_portability scope
# The destination service must request this specific scope to indicate it wants to perform account portability operations
@pytest.mark.django_db
def test_validate_scopes_with_valid_scope(oauth_validator, oauth_application, oauth_client, mock_request):
scopes = ['activitypub_account_portability']
result = oauth_validator.validate_scopes(
oauth_application.client_id,
scopes,
oauth_client,
mock_request
)
assert result, "The validator should accept the activitypub_account_portability scope"

# Test that validator rejects empty scopes
@pytest.mark.django_db
def test_validate_scopes_with_no_scopes(oauth_validator, oauth_application, oauth_client, mock_request):
scopes = []
result = oauth_validator.validate_scopes(
oauth_application.client_id,
scopes,
oauth_client,
mock_request
)
assert not result, "The validator should reject empty scopes"

# Test that validator rejects scopes without activitypub_account_portability
# This prevents services from using our OAuth endpoints for purposes other than account portability
@pytest.mark.django_db
def test_validate_scopes_with_invalid_scopes(oauth_validator, oauth_application, oauth_client, mock_request):
scopes = ['read', 'write']
result = oauth_validator.validate_scopes(
oauth_application.client_id,
scopes,
oauth_client,
mock_request
)
assert not result, "The validator should reject scopes without activitypub_account_portability"

# Test that validator accepts a valid redirect URI
@pytest.mark.django_db
def test_validate_redirect_uri_with_valid_uri(oauth_validator, oauth_application, mock_request):
with patch.object(oauth_validator.__class__.__bases__[0], 'validate_redirect_uri', return_value=True):
result = oauth_validator.validate_redirect_uri(
oauth_application.client_id,
'https://example.com/callback',
mock_request
)
assert result, "The validator should accept a valid redirect URI"

# Test that validator rejects an invalid redirect URI
# When a user authorizes a destination service, the authorization code must only be sent to
# the destination's registered redirect URL to prevent malicious services from intercepting the flow.
@pytest.mark.django_db
def test_validate_redirect_uri_with_invalid_uri(oauth_validator, oauth_application, mock_request):

with patch.object(oauth_validator.__class__.__bases__[0], 'validate_redirect_uri', return_value=False):
result = oauth_validator.validate_redirect_uri(
oauth_application.client_id,
'https://malicious-site.com/callback',
mock_request
)
assert not result, "The validator should reject an invalid redirect URI"
10 changes: 6 additions & 4 deletions testbed/core/utils/oauth_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
Application = get_application_model()

# Get or create the single OAuth Application for a user.
# This enforces the one-application-per-user approach.
# Args: user: The Django User object
# This enforces the one-application-per-user approach where each user
# represents an ActivityPub service in the LOLA portability flow.
def get_user_application(user):

# Check if user already has an application
Expand All @@ -19,12 +19,14 @@ def get_user_application(user):
if applications.count() > 1:
# Log warning if multiple exist (shouldn't happen with our business logic)
logger.warning(f"User {user.username} has multiple OAuth applications. Using the first one.")
logger.info(f"Retrieved existing OAuth application for user {user.username}")
return application

# Create a new application with random credentials
# Create a new application with random credentials and ActivityPub-specific name
logger.info(f"Creating new ActivityPub OAuth application for user {user.username}")
return Application.objects.create(
user=user,
name=f"{user.username}'s OAuth App",
name=f"{user.username}'s ActivityPub Service",
client_id=random_client_id(),
client_secret=random_client_secret(),
redirect_uris='',
Expand Down
40 changes: 40 additions & 0 deletions testbed/core/utils/oauth_validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
from oauth2_provider.oauth2_validators import OAuth2Validator

logger = logging.getLogger(__name__)

# Custom validator for ActivityPub-specific OAuth requirements
class ActivityPubOAuth2Validator(OAuth2Validator):

# Ensure the client is requesting valid scopes for ActivityPub portability
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
if not scopes:
logger.warning(f"Client {client_id} requested OAuth with no scopes")
return False

# For account portability, it requires the 'activitypub_account_portability' scope
if 'activitypub_account_portability' not in scopes:
logger.warning(
f"Client {client_id} requested OAuth without 'activitypub_account_portability' scope. "
f"Scopes: {scopes}"
)
return False

logger.info(f"Client {client_id} requested valid scopes: {scopes}")
return super().validate_scopes(client_id, scopes, client, request, *args, **kwargs)

# Additional validation for redirect URIs in ActivityPub context
def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):

# Standard validation first
valid = super().validate_redirect_uri(client_id, redirect_uri, request, *args, **kwargs)

if not valid:
logger.warning(f"Client {client_id} requested invalid redirect URI: {redirect_uri}")
return False

# We could add additional validation here if needed later on
# For example, checking for HTTPS in production

logger.info(f"Client {client_id} requested valid redirect URI: {redirect_uri}")
return True
1 change: 1 addition & 0 deletions testbed/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
'ACCESS_TOKEN_EXPIRE_SECONDS': 3600, # 1 hour
'REFRESH_TOKEN_EXPIRE_SECONDS': 86400, # 1 day
'AUTHORIZATION_CODE_EXPIRE_SECONDS': 600, # 10 minutes
'OAUTH2_VALIDATOR_CLASS': 'testbed.core.utils.oauth_validators.ActivityPubOAuth2Validator',
}

# Configure REST framework to use OAuth2 authentication
Expand Down