diff --git a/Packs/Code42/Integrations/Code42/Code42.py b/Packs/Code42/Integrations/Code42/Code42.py index fb48223c408b..bc978dbdf330 100644 --- a/Packs/Code42/Integrations/Code42/Code42.py +++ b/Packs/Code42/Integrations/Code42/Code42.py @@ -156,25 +156,34 @@ class Code42Client(BaseClient): def __init__(self, sdk, base_url, auth, verify=True, proxy=False): super().__init__(base_url, verify=verify, proxy=proxy) - # Create the Code42 SDK instance - self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1]) + # Allow sdk parameter for unit testing. + # Otherwise, lazily load the SDK so that the TEST Command can effectively check auth. + self._sdk = sdk + self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None py42.settings.set_user_agent_suffix("Cortex XSOAR") + def _get_sdk(self): + if self._sdk is None: + self._sdk = self._sdk_factory() + return self._sdk + def add_user_to_departing_employee(self, username, departure_date=None, note=None): user_id = self.get_user_id(username) - self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date) + self._get_sdk().detectionlists.departing_employee.add( + user_id, departure_date=departure_date + ) if note: - self._sdk.detectionlists.update_user_notes(user_id, note) + self._get_sdk().detectionlists.update_user_notes(user_id, note) return user_id def remove_user_from_departing_employee(self, username): user_id = self.get_user_id(username) - self._sdk.detectionlists.departing_employee.remove(user_id) + self._get_sdk().detectionlists.departing_employee.remove(user_id) return user_id def get_all_departing_employees(self): res = [] - pages = self._sdk.detectionlists.departing_employee.get_all() + pages = self._get_sdk().detectionlists.departing_employee.get_all() for page in pages: employees = page["items"] res.extend(employees) @@ -182,62 +191,62 @@ def get_all_departing_employees(self): def add_user_to_high_risk_employee(self, username, note=None): user_id = self.get_user_id(username) - self._sdk.detectionlists.high_risk_employee.add(user_id) + self._get_sdk().detectionlists.high_risk_employee.add(user_id) if note: - self._sdk.detectionlists.update_user_notes(user_id, note) + self._get_sdk().detectionlists.update_user_notes(user_id, note) return user_id def remove_user_from_high_risk_employee(self, username): user_id = self.get_user_id(username) - self._sdk.detectionlists.high_risk_employee.remove(user_id) + self._get_sdk().detectionlists.high_risk_employee.remove(user_id) return user_id def add_user_risk_tags(self, username, risk_tags): user_id = self.get_user_id(username) - self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags) + self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags) return user_id def remove_user_risk_tags(self, username, risk_tags): user_id = self.get_user_id(username) - self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags) + self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags) return user_id def get_all_high_risk_employees(self, risk_tags=None): if isinstance(risk_tags, str): risk_tags = [risk_tags] res = [] - pages = self._sdk.detectionlists.high_risk_employee.get_all() + pages = self._get_sdk().detectionlists.high_risk_employee.get_all() for page in pages: res.extend(_get_all_high_risk_employees_from_page(page, risk_tags)) return res def fetch_alerts(self, start_time, event_severity_filter): query = _create_alert_query(event_severity_filter, start_time) - res = self._sdk.alerts.search(query) + res = self._get_sdk().alerts.search(query) return res["alerts"] def get_alert_details(self, alert_id): - res = self._sdk.alerts.get_details(alert_id)["alerts"] + res = self._get_sdk().alerts.get_details(alert_id)["alerts"] if not res: raise Exception("No alert found with ID {0}.".format(alert_id)) return res[0] def resolve_alert(self, id): - self._sdk.alerts.resolve(id) + self._get_sdk().alerts.resolve(id) return id def get_current_user(self): - res = self._sdk.users.get_current() + res = self._get_sdk().users.get_current() return res def get_user_id(self, username): - res = self._sdk.users.get_by_username(username)["users"] + res = self._get_sdk().users.get_by_username(username)["users"] if not res: raise Exception("No user found with username {0}.".format(username)) return res[0]["userUid"] def search_file_events(self, payload): - res = self._sdk.securitydata.search_file_events(payload) + res = self._get_sdk().securitydata.search_file_events(payload) return res["fileEvents"] @@ -350,7 +359,7 @@ class ObservationToSecurityQueryMapper(object): exposure_type_map = { "PublicSearchableShare": ExposureType.IS_PUBLIC, "PublicLinkShare": ExposureType.SHARED_VIA_LINK, - "SharedOutsideTrustedDomain": "OutsideTrustedDomains" + "SharedOutsideTrustedDomain": "OutsideTrustedDomains", } def __init__(self, observation, actor): @@ -804,11 +813,16 @@ def securitydata_search_command(client, args): def test_module(client): - if client.get_current_user(): + try: + # Will fail if unauthorized + client.get_current_user() return "ok" - return "Invalid credentials or host address. Check that the username and password are correct, \ - that the host is available and reachable, and that you have supplied the full scheme, \ - domain, and port (e.g. https://myhost.code42.com:4285)" + except Exception: + return ( + "Invalid credentials or host address. Check that the username and password are correct, that the host " + "is available and reachable, and that you have supplied the full scheme, domain, and port " + "(e.g. https://myhost.code42.com:4285)." + ) def main(): diff --git a/Packs/Code42/Integrations/Code42/Code42_test.py b/Packs/Code42/Integrations/Code42/Code42_test.py index e30512bac844..02cccc97722f 100644 --- a/Packs/Code42/Integrations/Code42/Code42_test.py +++ b/Packs/Code42/Integrations/Code42/Code42_test.py @@ -1074,6 +1074,18 @@ def get_empty_detectionlist_response(mocker, base_text): """TESTS""" +def test_client_lazily_inits_sdk(mocker): + mocker.patch("py42.sdk.from_local_account") + + # test that sdk does not init during ctor + client = Code42Client(sdk=None, base_url=MOCK_URL, auth=MOCK_AUTH, verify=False, proxy=None) + assert client._sdk is None + + # test that sdk init from first method call + client.get_user_id("Test") + assert client._sdk is not None + + def test_client_when_no_alert_found_raises_exception(code42_sdk_mock): code42_sdk_mock.alerts.get_details.return_value = ( """{'type$': 'ALERT_DETAILS_RESPONSE', 'alerts': []}""" diff --git a/Packs/Code42/Integrations/Code42/integration-Code42.yml b/Packs/Code42/Integrations/Code42/integration-Code42.yml index 0bd326f1a27a..8d06d429c694 100644 --- a/Packs/Code42/Integrations/Code42/integration-Code42.yml +++ b/Packs/Code42/Integrations/Code42/integration-Code42.yml @@ -11,7 +11,7 @@ configuration: defaultvalue: console.us.code42.com type: 0 required: true -- display: "" +- display: "Username" name: credentials defaultvalue: "" type: 9 @@ -227,25 +227,34 @@ script: def __init__(self, sdk, base_url, auth, verify=True, proxy=False): super().__init__(base_url, verify=verify, proxy=proxy) - # Create the Code42 SDK instance - self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1]) + # Allow sdk parameter for unit testing. + # Otherwise, lazily load the SDK so that the TEST Command can effectively check auth. + self._sdk = sdk + self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None py42.settings.set_user_agent_suffix("Cortex XSOAR") + def _get_sdk(self): + if self._sdk is None: + self._sdk = self._sdk_factory() + return self._sdk + def add_user_to_departing_employee(self, username, departure_date=None, note=None): user_id = self.get_user_id(username) - self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date) + self._get_sdk().detectionlists.departing_employee.add( + user_id, departure_date=departure_date + ) if note: - self._sdk.detectionlists.update_user_notes(user_id, note) + self._get_sdk().detectionlists.update_user_notes(user_id, note) return user_id def remove_user_from_departing_employee(self, username): user_id = self.get_user_id(username) - self._sdk.detectionlists.departing_employee.remove(user_id) + self._get_sdk().detectionlists.departing_employee.remove(user_id) return user_id def get_all_departing_employees(self): res = [] - pages = self._sdk.detectionlists.departing_employee.get_all() + pages = self._get_sdk().detectionlists.departing_employee.get_all() for page in pages: employees = page["items"] res.extend(employees) @@ -253,62 +262,62 @@ script: def add_user_to_high_risk_employee(self, username, note=None): user_id = self.get_user_id(username) - self._sdk.detectionlists.high_risk_employee.add(user_id) + self._get_sdk().detectionlists.high_risk_employee.add(user_id) if note: - self._sdk.detectionlists.update_user_notes(user_id, note) + self._get_sdk().detectionlists.update_user_notes(user_id, note) return user_id def remove_user_from_high_risk_employee(self, username): user_id = self.get_user_id(username) - self._sdk.detectionlists.high_risk_employee.remove(user_id) + self._get_sdk().detectionlists.high_risk_employee.remove(user_id) return user_id def add_user_risk_tags(self, username, risk_tags): user_id = self.get_user_id(username) - self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags) + self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags) return user_id def remove_user_risk_tags(self, username, risk_tags): user_id = self.get_user_id(username) - self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags) + self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags) return user_id def get_all_high_risk_employees(self, risk_tags=None): if isinstance(risk_tags, str): risk_tags = [risk_tags] res = [] - pages = self._sdk.detectionlists.high_risk_employee.get_all() + pages = self._get_sdk().detectionlists.high_risk_employee.get_all() for page in pages: res.extend(_get_all_high_risk_employees_from_page(page, risk_tags)) return res def fetch_alerts(self, start_time, event_severity_filter): query = _create_alert_query(event_severity_filter, start_time) - res = self._sdk.alerts.search(query) + res = self._get_sdk().alerts.search(query) return res["alerts"] def get_alert_details(self, alert_id): - res = self._sdk.alerts.get_details(alert_id)["alerts"] + res = self._get_sdk().alerts.get_details(alert_id)["alerts"] if not res: raise Exception("No alert found with ID {0}.".format(alert_id)) return res[0] def resolve_alert(self, id): - self._sdk.alerts.resolve(id) + self._get_sdk().alerts.resolve(id) return id def get_current_user(self): - res = self._sdk.users.get_current() + res = self._get_sdk().users.get_current() return res def get_user_id(self, username): - res = self._sdk.users.get_by_username(username)["users"] + res = self._get_sdk().users.get_by_username(username)["users"] if not res: raise Exception("No user found with username {0}.".format(username)) return res[0]["userUid"] def search_file_events(self, payload): - res = self._sdk.securitydata.search_file_events(payload) + res = self._get_sdk().securitydata.search_file_events(payload) return res["fileEvents"] @@ -422,7 +431,7 @@ script: exposure_type_map = { "PublicSearchableShare": ExposureType.IS_PUBLIC, "PublicLinkShare": ExposureType.SHARED_VIA_LINK, - "SharedOutsideTrustedDomain": "OutsideTrustedDomains" + "SharedOutsideTrustedDomain": "OutsideTrustedDomains", } def __init__(self, observation, actor): @@ -892,11 +901,16 @@ script: def test_module(client): - if client.get_current_user(): + try: + # Will fail if unauthorized + client.get_current_user() return "ok" - return "Invalid credentials or host address. Check that the username and password are correct, \ - that the host is available and reachable, and that you have supplied the full scheme, \ - domain, and port (e.g. https://myhost.code42.com:4285)" + except Exception: + return ( + "Invalid credentials or host address. Check that the username and password are correct, that the host " + "is available and reachable, and that you have supplied the full scheme, domain, and port " + "(e.g. https://myhost.code42.com:4285)." + ) def main():