Skip to content

fix(auth): Integration tests for multi-tenancy and IdP management APIs #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion firebase_admin/_auth_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def display_name(self):

@property
def enabled(self):
return self._data['enabled']
return self._data.get('enabled', False)


class OIDCProviderConfig(ProviderConfig):
Expand Down
9 changes: 9 additions & 0 deletions firebase_admin/_user_mgt.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ def custom_claims(self):
return parsed
return None

@property
def tenant_id(self):
"""Returns the tenant ID of this user.

Returns:
string: A tenant ID string or None.
"""
return self._data.get('tenantId')


class ExportedUserRecord(UserRecord):
"""Contains metadata associated with a user including password hash and salt."""
Expand Down
32 changes: 22 additions & 10 deletions firebase_admin/tenant_mgt.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Google Cloud Identity Platform (GCIP) instance.
"""

import re
import threading

import requests
Expand All @@ -31,6 +32,7 @@

_TENANT_MGT_ATTRIBUTE = '_tenant_mgt'
_MAX_LIST_TENANTS_RESULTS = 100
_DISPLAY_NAME_PATTERN = re.compile('^[a-zA-Z][a-zA-Z0-9-]{3,19}$')


__all__ = [
Expand Down Expand Up @@ -89,15 +91,16 @@ def get_tenant(tenant_id, app=None):


def create_tenant(
display_name=None, allow_password_sign_up=None, enable_email_link_sign_in=None, app=None):
display_name, allow_password_sign_up=None, enable_email_link_sign_in=None, app=None):
"""Creates a new tenant from the given options.

Args:
display_name: Display name string for the new tenant (optional).
display_name: Display name string for the new tenant. Must begin with a letter and contain
only letters, digits and hyphens. Length must be between 4 and 20.
allow_password_sign_up: A boolean indicating whether to enable or disable the email sign-in
provider.
provider (optional).
enable_email_link_sign_in: A boolean indicating whether to enable or disable email link
sign-in. Disabling this makes the password required for email sign-in.
sign-in (optional). Disabling this makes the password required for email sign-in.
app: An App instance (optional).

Returns:
Expand All @@ -120,7 +123,7 @@ def update_tenant(

Args:
tenant_id: ID of the tenant to update.
display_name: Display name string for the new tenant (optional).
display_name: Updated display name string for the tenant (optional).
allow_password_sign_up: A boolean indicating whether to enable or disable the email sign-in
provider.
enable_email_link_sign_in: A boolean indicating whether to enable or disable email link
Expand Down Expand Up @@ -269,11 +272,10 @@ def get_tenant(self, tenant_id):
return Tenant(body)

def create_tenant(
self, display_name=None, allow_password_sign_up=None, enable_email_link_sign_in=None):
self, display_name, allow_password_sign_up=None, enable_email_link_sign_in=None):
"""Creates a new tenant from the given parameters."""
payload = {}
if display_name is not None:
payload['displayName'] = _auth_utils.validate_string(display_name, 'displayName')

payload = {'displayName': _validate_display_name(display_name)}
if allow_password_sign_up is not None:
payload['allowPasswordSignup'] = _auth_utils.validate_boolean(
allow_password_sign_up, 'allowPasswordSignup')
Expand All @@ -297,7 +299,7 @@ def update_tenant(

payload = {}
if display_name is not None:
payload['displayName'] = _auth_utils.validate_string(display_name, 'displayName')
payload['displayName'] = _validate_display_name(display_name)
if allow_password_sign_up is not None:
payload['allowPasswordSignup'] = _auth_utils.validate_boolean(
allow_password_sign_up, 'allowPasswordSignup')
Expand Down Expand Up @@ -431,3 +433,13 @@ def __next__(self):

def __iter__(self):
return self


def _validate_display_name(display_name):
if not isinstance(display_name, str):
raise ValueError('Invalid type for displayName')
if not _DISPLAY_NAME_PATTERN.search(display_name):
raise ValueError(
'displayName must start with a letter and only consist of letters, digits and '
'hyphens with 4-20 characters.')
return display_name
186 changes: 186 additions & 0 deletions integration/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import base64
import datetime
import random
import string
import time
from urllib import parse
import uuid
Expand All @@ -38,6 +39,30 @@

ACTION_LINK_CONTINUE_URL = 'http://localhost?a=1&b=5#f=1'

X509_CERTIFICATES = [
('-----BEGIN CERTIFICATE-----\nMIICZjCCAc+gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBQMQswCQYDVQQGEwJ1czE'
'L\nMAkGA1UECAwCQ0ExDTALBgNVBAoMBEFjbWUxETAPBgNVBAMMCGFjbWUuY29tMRIw\nEAYDVQQHDAlTdW5ueXZhbGU'
'wHhcNMTgxMjA2MDc1MTUxWhcNMjgxMjAzMDc1MTUx\nWjBQMQswCQYDVQQGEwJ1czELMAkGA1UECAwCQ0ExDTALBgNVB'
'AoMBEFjbWUxETAP\nBgNVBAMMCGFjbWUuY29tMRIwEAYDVQQHDAlTdW5ueXZhbGUwgZ8wDQYJKoZIhvcN\nAQEBBQADg'
'Y0AMIGJAoGBAKphmggjiVgqMLXyzvI7cKphscIIQ+wcv7Dld6MD4aKv\n7Jqr8ltujMxBUeY4LFEKw8Terb01snYpDot'
'filaG6NxpF/GfVVmMalzwWp0mT8+H\nyzyPj89mRcozu17RwuooR6n1ofXjGcBE86lqC21UhA3WVgjPOLqB42rlE9gPn'
'ZLB\nAgMBAAGjUDBOMB0GA1UdDgQWBBS0iM7WnbCNOnieOP1HIA+Oz/ML+zAfBgNVHSME\nGDAWgBS0iM7WnbCNOnieO'
'P1HIA+Oz/ML+zAMBgNVHRMEBTADAQH/MA0GCSqGSIb3\nDQEBDQUAA4GBAF3jBgS+wP+K/jTupEQur6iaqS4UvXd//d4'
'vo1MV06oTLQMTz+rP\nOSMDNwxzfaOn6vgYLKP/Dcy9dSTnSzgxLAxfKvDQZA0vE3udsw0Bd245MmX4+GOp\nlbrN99X'
'P1u+lFxCSdMUzvQ/jW4ysw/Nq4JdJ0gPAyPvL6Qi/3mQdIQwx\n-----END CERTIFICATE-----\n'),
('-----BEGIN CERTIFICATE-----\nMIICZjCCAc+gAwIBAgIBADANBgkqhkiG9w0BAQ0FADBQMQswCQYDVQQGEwJ1czE'
'L\nMAkGA1UECAwCQ0ExDTALBgNVBAoMBEFjbWUxETAPBgNVBAMMCGFjbWUuY29tMRIw\nEAYDVQQHDAlTdW5ueXZhbGU'
'wHhcNMTgxMjA2MDc1ODE4WhcNMjgxMjAzMDc1ODE4\nWjBQMQswCQYDVQQGEwJ1czELMAkGA1UECAwCQ0ExDTALBgNVB'
'AoMBEFjbWUxETAP\nBgNVBAMMCGFjbWUuY29tMRIwEAYDVQQHDAlTdW5ueXZhbGUwgZ8wDQYJKoZIhvcN\nAQEBBQADg'
'Y0AMIGJAoGBAKuzYKfDZGA6DJgQru3wNUqv+S0hMZfP/jbp8ou/8UKu\nrNeX7cfCgt3yxoGCJYKmF6t5mvo76JY0MWw'
'A53BxeP/oyXmJ93uHG5mFRAsVAUKs\ncVVb0Xi6ujxZGVdDWFV696L0BNOoHTfXmac6IBoZQzNNK4n1AATqwo+z7a0pf'
'RrJ\nAgMBAAGjUDBOMB0GA1UdDgQWBBSKmi/ZKMuLN0ES7/jPa7q7jAjPiDAfBgNVHSME\nGDAWgBSKmi/ZKMuLN0ES7'
'/jPa7q7jAjPiDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3\nDQEBDQUAA4GBAAg2a2kSn05NiUOuWOHwPUjW3wQRsGxPXtb'
'hWMhmNdCfKKteM2+/\nLd/jz5F3qkOgGQ3UDgr3SHEoWhnLaJMF4a2tm6vL2rEIfPEK81KhTTRxSsAgMVbU\nJXBz1md'
'6Ur0HlgQC7d1CHC8/xi2DDwHopLyxhogaZUxy9IaRxUEa2vJW\n-----END CERTIFICATE-----\n'),
]


def _sign_in(custom_token, api_key):
body = {'token' : custom_token.decode(), 'returnSecureToken' : True}
params = {'key' : api_key}
Expand All @@ -52,6 +77,10 @@ def _sign_in_with_password(email, password, api_key):
resp.raise_for_status()
return resp.json().get('idToken')

def _random_string(length=10):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))

def _random_id():
random_id = str(uuid.uuid4()).lower().replace('-', '')
email = 'test{0}@example.{1}.com'.format(random_id[:12], random_id[12:])
Expand Down Expand Up @@ -477,6 +506,163 @@ def test_email_sign_in_with_settings(new_user_email_unverified, api_key):
assert id_token is not None and len(id_token) > 0
assert auth.get_user(new_user_email_unverified.uid).email_verified


@pytest.fixture(scope='module')
def oidc_provider():
provider_config = _create_oidc_provider_config()
yield provider_config
auth.delete_oidc_provider_config(provider_config.provider_id)


def test_create_oidc_provider_config(oidc_provider):
assert isinstance(oidc_provider, auth.OIDCProviderConfig)
assert oidc_provider.client_id == 'OIDC_CLIENT_ID'
assert oidc_provider.issuer == 'https://oidc.com/issuer'
assert oidc_provider.display_name == 'OIDC_DISPLAY_NAME'
assert oidc_provider.enabled is True


def test_get_oidc_provider_config(oidc_provider):
provider_config = auth.get_oidc_provider_config(oidc_provider.provider_id)
assert isinstance(provider_config, auth.OIDCProviderConfig)
assert provider_config.provider_id == oidc_provider.provider_id
assert provider_config.client_id == 'OIDC_CLIENT_ID'
assert provider_config.issuer == 'https://oidc.com/issuer'
assert provider_config.display_name == 'OIDC_DISPLAY_NAME'
assert provider_config.enabled is True


def test_list_oidc_provider_configs(oidc_provider):
page = auth.list_oidc_provider_configs()
result = None
for provider_config in page.iterate_all():
if provider_config.provider_id == oidc_provider.provider_id:
result = provider_config
break

assert result is not None


def test_update_oidc_provider_config():
provider_config = _create_oidc_provider_config()
try:
provider_config = auth.update_oidc_provider_config(
provider_config.provider_id,
client_id='UPDATED_OIDC_CLIENT_ID',
issuer='https://oidc.com/updated_issuer',
display_name='UPDATED_OIDC_DISPLAY_NAME',
enabled=False)
assert provider_config.client_id == 'UPDATED_OIDC_CLIENT_ID'
assert provider_config.issuer == 'https://oidc.com/updated_issuer'
assert provider_config.display_name == 'UPDATED_OIDC_DISPLAY_NAME'
assert provider_config.enabled is False
finally:
auth.delete_oidc_provider_config(provider_config.provider_id)


def test_delete_oidc_provider_config():
provider_config = _create_oidc_provider_config()
auth.delete_oidc_provider_config(provider_config.provider_id)
with pytest.raises(auth.ConfigurationNotFoundError):
auth.get_oidc_provider_config(provider_config.provider_id)


@pytest.fixture(scope='module')
def saml_provider():
provider_config = _create_saml_provider_config()
yield provider_config
auth.delete_saml_provider_config(provider_config.provider_id)


def test_create_saml_provider_config(saml_provider):
assert isinstance(saml_provider, auth.SAMLProviderConfig)
assert saml_provider.idp_entity_id == 'IDP_ENTITY_ID'
assert saml_provider.sso_url == 'https://example.com/login'
assert saml_provider.x509_certificates == [X509_CERTIFICATES[0]]
assert saml_provider.rp_entity_id == 'RP_ENTITY_ID'
assert saml_provider.callback_url == 'https://projectId.firebaseapp.com/__/auth/handler'
assert saml_provider.display_name == 'SAML_DISPLAY_NAME'
assert saml_provider.enabled is True


def test_get_saml_provider_config(saml_provider):
provider_config = auth.get_saml_provider_config(saml_provider.provider_id)
assert isinstance(provider_config, auth.SAMLProviderConfig)
assert provider_config.provider_id == saml_provider.provider_id
assert provider_config.idp_entity_id == 'IDP_ENTITY_ID'
assert provider_config.sso_url == 'https://example.com/login'
assert provider_config.x509_certificates == [X509_CERTIFICATES[0]]
assert provider_config.rp_entity_id == 'RP_ENTITY_ID'
assert provider_config.callback_url == 'https://projectId.firebaseapp.com/__/auth/handler'
assert provider_config.display_name == 'SAML_DISPLAY_NAME'
assert provider_config.enabled is True


def test_list_saml_provider_configs(saml_provider):
page = auth.list_saml_provider_configs()
result = None
for provider_config in page.iterate_all():
if provider_config.provider_id == saml_provider.provider_id:
result = provider_config
break

assert result is not None


def test_update_saml_provider_config():
provider_config = _create_saml_provider_config()
try:
provider_config = auth.update_saml_provider_config(
provider_config.provider_id,
idp_entity_id='UPDATED_IDP_ENTITY_ID',
sso_url='https://example.com/updated_login',
x509_certificates=[X509_CERTIFICATES[1]],
rp_entity_id='UPDATED_RP_ENTITY_ID',
callback_url='https://updatedProjectId.firebaseapp.com/__/auth/handler',
display_name='UPDATED_SAML_DISPLAY_NAME',
enabled=False)
assert provider_config.idp_entity_id == 'UPDATED_IDP_ENTITY_ID'
assert provider_config.sso_url == 'https://example.com/updated_login'
assert provider_config.x509_certificates == [X509_CERTIFICATES[1]]
assert provider_config.rp_entity_id == 'UPDATED_RP_ENTITY_ID'
assert provider_config.callback_url == ('https://updatedProjectId.firebaseapp.com/'
'__/auth/handler')
assert provider_config.display_name == 'UPDATED_SAML_DISPLAY_NAME'
assert provider_config.enabled is False
finally:
auth.delete_saml_provider_config(provider_config.provider_id)


def test_delete_saml_provider_config():
provider_config = _create_saml_provider_config()
auth.delete_saml_provider_config(provider_config.provider_id)
with pytest.raises(auth.ConfigurationNotFoundError):
auth.get_saml_provider_config(provider_config.provider_id)


def _create_oidc_provider_config():
provider_id = 'oidc.{0}'.format(_random_string())
return auth.create_oidc_provider_config(
provider_id=provider_id,
client_id='OIDC_CLIENT_ID',
issuer='https://oidc.com/issuer',
display_name='OIDC_DISPLAY_NAME',
enabled=True)


def _create_saml_provider_config():
provider_id = 'saml.{0}'.format(_random_string())
return auth.create_saml_provider_config(
provider_id=provider_id,
idp_entity_id='IDP_ENTITY_ID',
sso_url='https://example.com/login',
x509_certificates=[X509_CERTIFICATES[0]],
rp_entity_id='RP_ENTITY_ID',
callback_url='https://projectId.firebaseapp.com/__/auth/handler',
display_name='SAML_DISPLAY_NAME',
enabled=True)


class CredentialWrapper(credentials.Base):
"""A custom Firebase credential that wraps an OAuth2 token."""

Expand Down
Loading