Skip to content

User update fix #811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 124 additions & 24 deletions tests/test_umapi_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tests.util import compare_iter
from user_sync.connector.connector_umapi import Commands
from user_sync.engine.common import AdobeGroup
from user_sync.engine.umapi import UmapiTargetInfo, UmapiConnectors, RuleProcessor
from user_sync.engine.umapi import UmapiTargetInfo, UmapiConnectors, RuleProcessor, MultiIndex


@pytest.fixture
Expand Down Expand Up @@ -90,7 +90,7 @@ def progress_func(*_):
rp.logger.progress = progress_func

key = rp.get_user_key(user['identity_type'], user['username'], user['domain'])
rp.directory_user_by_user_key[key] = user
rp.directory_user_index = MultiIndex(data=[user], key_names=['email', 'username'])
rp.options['process_groups'] = True
rp.push_umapi = True

Expand Down Expand Up @@ -132,7 +132,7 @@ def update(up_user, up_attrs):
conn = MockUmapiConnector()
info = UmapiTargetInfo(None)
user_key = rp.get_user_key(up_user['identity_type'], up_user['username'], up_user['domain'])
rp.directory_user_by_user_key[user_key] = up_user
rp.directory_user_index = MultiIndex(data=[up_user], key_names=['email', 'username'])
commands = [rp.update_umapi_user(info, user_key, up_attrs, group_add, group_rem, mock_umapi_user)]
rp.execute_commands(commands, conn)
assert user_key in rp.updated_user_keys
Expand Down Expand Up @@ -326,9 +326,8 @@ def test_read_desired_user_groups_basic(rule_processor, mock_dir_user):
assert "Console Group" in rp.after_mapping_hook_scope['target_groups']

# Assert the user group updated in umapi info
user_key = rp.get_directory_user_key(mock_dir_user)
assert ('console group' in rp.umapi_info_by_name[None].desired_groups_by_user_key[user_key])
assert user_key in rp.filtered_directory_user_by_user_key
assert 'console group' in rp.umapi_info_by_name[None].get_desired_groups(email=mock_dir_user['email'], username=mock_dir_user['username'])['desired_groups']
assert mock_dir_user['email'] == rp.filtered_directory_user_index.data[0]['email']

@mock.patch('user_sync.helper.CSVAdapter.read_csv_rows')
def test_read_stray_key_map(csv_reader, rule_processor):
Expand Down Expand Up @@ -477,22 +476,123 @@ def compare_attr(text, target):
compare_attr(x[5], state['target_attributes'])


class TestUmapiTargetInfo():
def test_add_mapped_group(self):
umapi_target_info = UmapiTargetInfo("")
umapi_target_info.add_mapped_group("All Students")
assert "all students" in umapi_target_info.mapped_groups
assert "All Students" in umapi_target_info.non_normalize_mapped_groups

def test_add_additional_group(self):
umapi_target_info = UmapiTargetInfo("")
umapi_target_info.add_additional_group('old_name', 'new_name')
assert umapi_target_info.additional_group_map['old_name'][0] == 'new_name'

def test_add_desired_group_for(self):
umapi_target_info = UmapiTargetInfo("")
with mock.patch("user_sync.engine.umapi.UmapiTargetInfo.get_desired_groups") as mock_desired_groups:
mock_desired_groups.return_value = None
umapi_target_info.add_desired_group_for('user_key', 'group_name')
assert umapi_target_info.desired_groups_by_user_key['user_key'] == {'group_name'}
def test_targetinfo_add_mapped_group():
umapi_target_info = UmapiTargetInfo("")
umapi_target_info.add_mapped_group("All Students")
assert "all students" in umapi_target_info.mapped_groups
assert "All Students" in umapi_target_info.non_normalize_mapped_groups

def test_targetinfo_add_additional_group():
umapi_target_info = UmapiTargetInfo("")
umapi_target_info.add_additional_group('old_name', 'new_name')
assert umapi_target_info.additional_group_map['old_name'][0] == 'new_name'

def test_targetinfo_add_desired_group_for():
umapi_target_info = UmapiTargetInfo(None)
umapi_target_info.add_desired_group_for('federatedID', 'example.com', '[email protected]', '[email protected]', 'group_name')
assert 'group_name' in umapi_target_info.get_desired_groups('[email protected]', '[email protected]')['desired_groups']


@pytest.fixture
def test_data():
return [{
"email": "[email protected]",
"username": "[email protected]",
"firstname": "Test",
"lastname": "User 001",
},{
"email": "[email protected]",
"username": "[email protected]",
"firstname": "Test",
"lastname": "User 002",
},{
"email": "[email protected]",
"username": "[email protected]",
"firstname": "Test",
"lastname": "User 003",
}]


def test_new_multi_index(test_data):
"""Construct valid MultiIndex with no errors"""
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
assert user_index.data == test_data
assert user_index.key_names == ['email', 'username']
assert user_index.index['email'] == {'[email protected]': 0, '[email protected]': 1, '[email protected]': 2}
assert user_index.index['username'] == {'[email protected]': 0, '[email protected]': 1, '[email protected]': 2}


def test_new_multi_index_keyerr(test_data):
"""MultiIndex data must be complete"""
del test_data[1]['email']

with pytest.raises(KeyError):
MultiIndex(data=test_data, key_names=['email', 'username'])


def test_multi_index_retrieve(test_data):
"""Ensure we get one record from MultiIndex"""
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
# get a single user with exact email/username match
user = user_index.get(email='[email protected]', username='[email protected]')
assert user['firstname'] == 'Test'
assert user['lastname'] == 'User 003'

# get a single user where email matches but not username
user = user_index.get(email='[email protected]', username='[email protected]')
assert user['firstname'] == 'Test'
assert user['lastname'] == 'User 002'

# get a single record with a single key
user = user_index.get(email='[email protected]')
assert user['firstname'] == 'Test'
assert user['lastname'] == 'User 001'


def test_multi_index_retrieve_none(test_data):
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
result = user_index.get(email='[email protected]', username='[email protected]')
assert result is None


def test_multi_index_nonexistent_key(test_data):
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
with pytest.raises(KeyError):
user_index.get(foobar='[email protected]')


def test_multi_index_add_record(test_data):
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
user_index.add({
'email': '[email protected]',
'username': '[email protected]',
'firstname': 'Test',
'lastname': 'User 004',
})

user = user_index.get(email='[email protected]', username='[email protected]')
assert user['firstname'] == 'Test'
assert user['lastname'] == 'User 004'


def test_multi_index_add_record_err(test_data):
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
with pytest.raises(KeyError):
user_index.add({
'email': '[email protected]',
'firstname': 'Test',
'lastname': 'User 004',
})


def test_multi_index_update(test_data):
user_index = MultiIndex(data=test_data, key_names=['email', 'username'])
user = test_data[0].copy()
user['firstname'] = 'Test Updated'
user['lastname'] = 'User 001 Updated'

user_index.update(user, email=user['email'], username=user['username'])

user = user_index.get(email=user['email'], username=user['username'])
assert user['firstname'] == 'Test Updated'
assert user['lastname'] == 'User 001 Updated'
Loading