|
| 1 | +import datetime |
| 2 | +from unittest.mock import AsyncMock, patch |
| 3 | + |
| 4 | +import pytest |
| 5 | + |
| 6 | +from codegate.db.models import Session, Workspace, WorkspaceActive |
| 7 | +from codegate.pipeline.workspace.workspace import WorkspaceCommands |
| 8 | + |
| 9 | + |
| 10 | +@pytest.mark.asyncio |
| 11 | +@pytest.mark.parametrize( |
| 12 | + "mock_workspaces, expected_output", |
| 13 | + [ |
| 14 | + # Case 1: No workspaces |
| 15 | + ([], ""), |
| 16 | + # Case 2: One workspace active |
| 17 | + ( |
| 18 | + [ |
| 19 | + # We'll make a MagicMock that simulates a workspace |
| 20 | + # with 'name' attribute and 'active_workspace_id' set |
| 21 | + WorkspaceActive(id="1", name="Workspace1", active_workspace_id="100") |
| 22 | + ], |
| 23 | + "- Workspace1 **(active)**\n", |
| 24 | + ), |
| 25 | + # Case 3: Multiple workspaces, second one active |
| 26 | + ( |
| 27 | + [ |
| 28 | + WorkspaceActive(id="1", name="Workspace1", active_workspace_id=None), |
| 29 | + WorkspaceActive(id="2", name="Workspace2", active_workspace_id="200"), |
| 30 | + ], |
| 31 | + "- Workspace1\n- Workspace2 **(active)**\n", |
| 32 | + ), |
| 33 | + ], |
| 34 | +) |
| 35 | +async def test_list_workspaces(mock_workspaces, expected_output): |
| 36 | + """ |
| 37 | + Test _list_workspaces with different sets of returned workspaces. |
| 38 | + """ |
| 39 | + workspace_commands = WorkspaceCommands() |
| 40 | + |
| 41 | + # Mock DbReader inside workspace_commands |
| 42 | + mock_db_reader = AsyncMock() |
| 43 | + mock_db_reader.get_workspaces.return_value = mock_workspaces |
| 44 | + workspace_commands._db_reader = mock_db_reader |
| 45 | + |
| 46 | + # Call the method |
| 47 | + result = await workspace_commands._list_workspaces() |
| 48 | + |
| 49 | + # Check the result |
| 50 | + assert result == expected_output |
| 51 | + mock_db_reader.get_workspaces.assert_awaited_once() |
| 52 | + |
| 53 | + |
| 54 | +@pytest.mark.asyncio |
| 55 | +@pytest.mark.parametrize( |
| 56 | + "args, existing_workspaces, expected_message", |
| 57 | + [ |
| 58 | + # Case 1: No workspace name provided |
| 59 | + ([], [], "Please provide a name. Use `codegate-workspace add your_workspace_name`"), |
| 60 | + # Case 2: Workspace name is empty string |
| 61 | + ([""], [], "Please provide a name. Use `codegate-workspace add your_workspace_name`"), |
| 62 | + # Case 3: Workspace already exists |
| 63 | + ( |
| 64 | + ["myworkspace"], |
| 65 | + [Workspace(name="myworkspace", id="1")], |
| 66 | + "Workspace **myworkspace** already exists", |
| 67 | + ), |
| 68 | + # Case 4: Successful add |
| 69 | + (["myworkspace"], [], "Workspace **myworkspace** has been added"), |
| 70 | + ], |
| 71 | +) |
| 72 | +async def test_add_workspaces(args, existing_workspaces, expected_message): |
| 73 | + """ |
| 74 | + Test _add_workspace under different scenarios: |
| 75 | + - no args |
| 76 | + - empty string arg |
| 77 | + - workspace already exists |
| 78 | + - workspace successfully added |
| 79 | + """ |
| 80 | + workspace_commands = WorkspaceCommands() |
| 81 | + |
| 82 | + # Mock the DbReader to return existing_workspaces |
| 83 | + mock_db_reader = AsyncMock() |
| 84 | + mock_db_reader.get_workspace_by_name.return_value = existing_workspaces |
| 85 | + workspace_commands._db_reader = mock_db_reader |
| 86 | + |
| 87 | + # We'll also patch DbRecorder to ensure no real DB operations happen |
| 88 | + with patch( |
| 89 | + "codegate.pipeline.workspace.workspace.DbRecorder", autospec=True |
| 90 | + ) as mock_recorder_cls: |
| 91 | + mock_recorder = mock_recorder_cls.return_value |
| 92 | + mock_recorder.add_workspace = AsyncMock() |
| 93 | + |
| 94 | + # Call the method |
| 95 | + result = await workspace_commands._add_workspace(*args) |
| 96 | + |
| 97 | + # Assertions |
| 98 | + assert result == expected_message |
| 99 | + |
| 100 | + # If expected_message indicates "added", we expect add_workspace to be called once |
| 101 | + if "has been added" in expected_message: |
| 102 | + mock_recorder.add_workspace.assert_awaited_once_with(args[0]) |
| 103 | + else: |
| 104 | + mock_recorder.add_workspace.assert_not_awaited() |
| 105 | + |
| 106 | + |
| 107 | +@pytest.mark.asyncio |
| 108 | +@pytest.mark.parametrize( |
| 109 | + "args, workspace_exists, sessions, expected_message", |
| 110 | + [ |
| 111 | + # Case 1: No name provided |
| 112 | + ([], False, [], "Please provide a name. Use `codegate-workspace activate workspace_name`"), |
| 113 | + # Case 2: Workspace does not exist |
| 114 | + ( |
| 115 | + ["non_existing_ws"], |
| 116 | + False, |
| 117 | + [], |
| 118 | + ( |
| 119 | + "Workspace **non_existing_ws** does not exist. " |
| 120 | + "Use `codegate-workspace add non_existing_ws` to add it" |
| 121 | + ), |
| 122 | + ), |
| 123 | + # Case 3: No active session found |
| 124 | + (["myworkspace"], True, [], "Something went wrong. No active session found."), |
| 125 | + # Case 4: Workspace is already active |
| 126 | + ( |
| 127 | + ["myworkspace"], |
| 128 | + True, |
| 129 | + [Session(id="1", active_workspace_id="10", last_update=datetime.datetime.now())], |
| 130 | + "Workspace **myworkspace** is already active", |
| 131 | + ), |
| 132 | + # Case 5: Successfully activate new workspace |
| 133 | + ( |
| 134 | + ["myworkspace"], |
| 135 | + True, |
| 136 | + [ |
| 137 | + # This session has a different active_workspace_id (99), so we can activate 10 |
| 138 | + Session(id="1", active_workspace_id="99", last_update=datetime.datetime.now()) |
| 139 | + ], |
| 140 | + "Workspace **myworkspace** has been activated", |
| 141 | + ), |
| 142 | + ], |
| 143 | +) |
| 144 | +async def test_activate_workspace(args, workspace_exists, sessions, expected_message): |
| 145 | + """ |
| 146 | + Test _activate_workspace under various conditions: |
| 147 | + - no name provided |
| 148 | + - workspace not found |
| 149 | + - session not found |
| 150 | + - workspace already active |
| 151 | + - successful activation |
| 152 | + """ |
| 153 | + workspace_commands = WorkspaceCommands() |
| 154 | + |
| 155 | + # Mock the DbReader to return either an empty list or a mock workspace |
| 156 | + mock_db_reader = AsyncMock() |
| 157 | + |
| 158 | + if workspace_exists: |
| 159 | + # We'll pretend we found a workspace: ID = 10 |
| 160 | + mock_workspace = Workspace(id="10", name=args[0]) |
| 161 | + mock_db_reader.get_workspace_by_name.return_value = [mock_workspace] |
| 162 | + else: |
| 163 | + mock_db_reader.get_workspace_by_name.return_value = [] |
| 164 | + |
| 165 | + # Return the sessions for get_sessions |
| 166 | + mock_db_reader.get_sessions.return_value = sessions |
| 167 | + |
| 168 | + workspace_commands._db_reader = mock_db_reader |
| 169 | + |
| 170 | + with patch( |
| 171 | + "codegate.pipeline.workspace.workspace.DbRecorder", autospec=True |
| 172 | + ) as mock_recorder_cls: |
| 173 | + mock_recorder = mock_recorder_cls.return_value |
| 174 | + mock_recorder.update_session = AsyncMock() |
| 175 | + |
| 176 | + result = await workspace_commands._activate_workspace(*args) |
| 177 | + |
| 178 | + assert result == expected_message |
| 179 | + |
| 180 | + # If we expect a successful activation, check that update_session was called |
| 181 | + if "has been activated" in expected_message: |
| 182 | + mock_recorder.update_session.assert_awaited_once() |
| 183 | + updated_session = mock_recorder.update_session.await_args[0][0] |
| 184 | + # Check that active_workspace_id is changed to 10 (our mock workspace ID) |
| 185 | + assert updated_session.active_workspace_id == "10" |
| 186 | + # Check that last_update was set to now |
| 187 | + assert isinstance(updated_session.last_update, datetime.datetime) |
| 188 | + else: |
| 189 | + mock_recorder.update_session.assert_not_awaited() |
| 190 | + |
| 191 | + |
| 192 | +@pytest.mark.asyncio |
| 193 | +@pytest.mark.parametrize( |
| 194 | + "user_message, expected_command, expected_args, mocked_execute_response", |
| 195 | + [ |
| 196 | + ("codegate-workspace list", "list", [], "List workspaces output"), |
| 197 | + ("codegate-workspace add myws", "add", ["myws"], "Added workspace"), |
| 198 | + ("codegate-workspace activate myws", "activate", ["myws"], "Activated workspace"), |
| 199 | + ], |
| 200 | +) |
| 201 | +async def test_parse_execute_cmd( |
| 202 | + user_message, expected_command, expected_args, mocked_execute_response |
| 203 | +): |
| 204 | + """ |
| 205 | + Test parse_execute_cmd to ensure it parses the user message |
| 206 | + and calls the correct command with the correct args. |
| 207 | + """ |
| 208 | + workspace_commands = WorkspaceCommands() |
| 209 | + |
| 210 | + with patch.object( |
| 211 | + workspace_commands, "execute", return_value=mocked_execute_response |
| 212 | + ) as mock_execute: |
| 213 | + result = await workspace_commands.parse_execute_cmd(user_message) |
| 214 | + assert result == mocked_execute_response |
| 215 | + |
| 216 | + # Verify 'execute' was called with the expected command and args |
| 217 | + mock_execute.assert_awaited_once_with(expected_command, *expected_args) |
0 commit comments