Skip to content

Fix test command #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions Packs/Code42/Integrations/Code42/Code42.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,88 +156,97 @@ class Code42Client(BaseClient):

def __init__(self, sdk, base_url, auth, verify=True, proxy=False):
super().__init__(base_url, verify=verify, proxy=proxy)
# Create the Code42 SDK instance
self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1])
# Allow sdk parameter for unit testing.
# Otherwise, lazily load the SDK so that the TEST Command can effectively check auth.
self._sdk = sdk
self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None
py42.settings.set_user_agent_suffix("Cortex XSOAR")

def _get_sdk(self):
if self._sdk is None:
self._sdk = self._sdk_factory()
return self._sdk

def add_user_to_departing_employee(self, username, departure_date=None, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date)
self._get_sdk().detectionlists.departing_employee.add(
user_id, departure_date=departure_date
)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_departing_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.remove(user_id)
self._get_sdk().detectionlists.departing_employee.remove(user_id)
return user_id

def get_all_departing_employees(self):
res = []
pages = self._sdk.detectionlists.departing_employee.get_all()
pages = self._get_sdk().detectionlists.departing_employee.get_all()
for page in pages:
employees = page["items"]
res.extend(employees)
return res

def add_user_to_high_risk_employee(self, username, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.add(user_id)
self._get_sdk().detectionlists.high_risk_employee.add(user_id)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_high_risk_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.remove(user_id)
self._get_sdk().detectionlists.high_risk_employee.remove(user_id)
return user_id

def add_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags)
return user_id

def remove_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags)
return user_id

def get_all_high_risk_employees(self, risk_tags=None):
if isinstance(risk_tags, str):
risk_tags = [risk_tags]
res = []
pages = self._sdk.detectionlists.high_risk_employee.get_all()
pages = self._get_sdk().detectionlists.high_risk_employee.get_all()
for page in pages:
res.extend(_get_all_high_risk_employees_from_page(page, risk_tags))
return res

def fetch_alerts(self, start_time, event_severity_filter):
query = _create_alert_query(event_severity_filter, start_time)
res = self._sdk.alerts.search(query)
res = self._get_sdk().alerts.search(query)
return res["alerts"]

def get_alert_details(self, alert_id):
res = self._sdk.alerts.get_details(alert_id)["alerts"]
res = self._get_sdk().alerts.get_details(alert_id)["alerts"]
if not res:
raise Exception("No alert found with ID {0}.".format(alert_id))
return res[0]

def resolve_alert(self, id):
self._sdk.alerts.resolve(id)
self._get_sdk().alerts.resolve(id)
return id

def get_current_user(self):
res = self._sdk.users.get_current()
res = self._get_sdk().users.get_current()
return res

def get_user_id(self, username):
res = self._sdk.users.get_by_username(username)["users"]
res = self._get_sdk().users.get_by_username(username)["users"]
if not res:
raise Exception("No user found with username {0}.".format(username))
return res[0]["userUid"]

def search_file_events(self, payload):
res = self._sdk.securitydata.search_file_events(payload)
res = self._get_sdk().securitydata.search_file_events(payload)
return res["fileEvents"]


Expand Down Expand Up @@ -350,7 +359,7 @@ class ObservationToSecurityQueryMapper(object):
exposure_type_map = {
"PublicSearchableShare": ExposureType.IS_PUBLIC,
"PublicLinkShare": ExposureType.SHARED_VIA_LINK,
"SharedOutsideTrustedDomain": "OutsideTrustedDomains"
"SharedOutsideTrustedDomain": "OutsideTrustedDomains",
}

def __init__(self, observation, actor):
Expand Down Expand Up @@ -804,11 +813,16 @@ def securitydata_search_command(client, args):


def test_module(client):
if client.get_current_user():
try:
# Will fail if unauthorized
client.get_current_user()
return "ok"
return "Invalid credentials or host address. Check that the username and password are correct, \
that the host is available and reachable, and that you have supplied the full scheme, \
domain, and port (e.g. https://myhost.code42.com:4285)"
except Exception:
return (
"Invalid credentials or host address. Check that the username and password are correct, that the host "
"is available and reachable, and that you have supplied the full scheme, domain, and port "
"(e.g. https://myhost.code42.com:4285)."
)


def main():
Expand Down
12 changes: 12 additions & 0 deletions Packs/Code42/Integrations/Code42/Code42_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,18 @@ def get_empty_detectionlist_response(mocker, base_text):
"""TESTS"""


def test_client_lazily_inits_sdk(mocker):
mocker.patch("py42.sdk.from_local_account")

# test that sdk does not init during ctor
client = Code42Client(sdk=None, base_url=MOCK_URL, auth=MOCK_AUTH, verify=False, proxy=None)
assert client._sdk is None

# test that sdk init from first method call
client.get_user_id("Test")
assert client._sdk is not None


def test_client_when_no_alert_found_raises_exception(code42_sdk_mock):
code42_sdk_mock.alerts.get_details.return_value = (
"""{'type$': 'ALERT_DETAILS_RESPONSE', 'alerts': []}"""
Expand Down
62 changes: 38 additions & 24 deletions Packs/Code42/Integrations/Code42/integration-Code42.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ configuration:
defaultvalue: console.us.code42.com
type: 0
required: true
- display: ""
- display: "Username"
name: credentials
defaultvalue: ""
type: 9
Expand Down Expand Up @@ -227,88 +227,97 @@ script:

def __init__(self, sdk, base_url, auth, verify=True, proxy=False):
super().__init__(base_url, verify=verify, proxy=proxy)
# Create the Code42 SDK instance
self._sdk = sdk or py42.sdk.from_local_account(base_url, auth[0], auth[1])
# Allow sdk parameter for unit testing.
# Otherwise, lazily load the SDK so that the TEST Command can effectively check auth.
self._sdk = sdk
self._sdk_factory = lambda: py42.sdk.from_local_account(base_url, auth[0], auth[1]) if not self._sdk else None
py42.settings.set_user_agent_suffix("Cortex XSOAR")

def _get_sdk(self):
if self._sdk is None:
self._sdk = self._sdk_factory()
return self._sdk

def add_user_to_departing_employee(self, username, departure_date=None, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.add(user_id, departure_date=departure_date)
self._get_sdk().detectionlists.departing_employee.add(
user_id, departure_date=departure_date
)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_departing_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.departing_employee.remove(user_id)
self._get_sdk().detectionlists.departing_employee.remove(user_id)
return user_id

def get_all_departing_employees(self):
res = []
pages = self._sdk.detectionlists.departing_employee.get_all()
pages = self._get_sdk().detectionlists.departing_employee.get_all()
for page in pages:
employees = page["items"]
res.extend(employees)
return res

def add_user_to_high_risk_employee(self, username, note=None):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.add(user_id)
self._get_sdk().detectionlists.high_risk_employee.add(user_id)
if note:
self._sdk.detectionlists.update_user_notes(user_id, note)
self._get_sdk().detectionlists.update_user_notes(user_id, note)
return user_id

def remove_user_from_high_risk_employee(self, username):
user_id = self.get_user_id(username)
self._sdk.detectionlists.high_risk_employee.remove(user_id)
self._get_sdk().detectionlists.high_risk_employee.remove(user_id)
return user_id

def add_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.add_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.add_user_risk_tags(user_id, risk_tags)
return user_id

def remove_user_risk_tags(self, username, risk_tags):
user_id = self.get_user_id(username)
self._sdk.detectionlists.remove_user_risk_tags(user_id, risk_tags)
self._get_sdk().detectionlists.remove_user_risk_tags(user_id, risk_tags)
return user_id

def get_all_high_risk_employees(self, risk_tags=None):
if isinstance(risk_tags, str):
risk_tags = [risk_tags]
res = []
pages = self._sdk.detectionlists.high_risk_employee.get_all()
pages = self._get_sdk().detectionlists.high_risk_employee.get_all()
for page in pages:
res.extend(_get_all_high_risk_employees_from_page(page, risk_tags))
return res

def fetch_alerts(self, start_time, event_severity_filter):
query = _create_alert_query(event_severity_filter, start_time)
res = self._sdk.alerts.search(query)
res = self._get_sdk().alerts.search(query)
return res["alerts"]

def get_alert_details(self, alert_id):
res = self._sdk.alerts.get_details(alert_id)["alerts"]
res = self._get_sdk().alerts.get_details(alert_id)["alerts"]
if not res:
raise Exception("No alert found with ID {0}.".format(alert_id))
return res[0]

def resolve_alert(self, id):
self._sdk.alerts.resolve(id)
self._get_sdk().alerts.resolve(id)
return id

def get_current_user(self):
res = self._sdk.users.get_current()
res = self._get_sdk().users.get_current()
return res

def get_user_id(self, username):
res = self._sdk.users.get_by_username(username)["users"]
res = self._get_sdk().users.get_by_username(username)["users"]
if not res:
raise Exception("No user found with username {0}.".format(username))
return res[0]["userUid"]

def search_file_events(self, payload):
res = self._sdk.securitydata.search_file_events(payload)
res = self._get_sdk().securitydata.search_file_events(payload)
return res["fileEvents"]


Expand Down Expand Up @@ -422,7 +431,7 @@ script:
exposure_type_map = {
"PublicSearchableShare": ExposureType.IS_PUBLIC,
"PublicLinkShare": ExposureType.SHARED_VIA_LINK,
"SharedOutsideTrustedDomain": "OutsideTrustedDomains"
"SharedOutsideTrustedDomain": "OutsideTrustedDomains",
}

def __init__(self, observation, actor):
Expand Down Expand Up @@ -892,11 +901,16 @@ script:


def test_module(client):
if client.get_current_user():
try:
# Will fail if unauthorized
client.get_current_user()
return "ok"
return "Invalid credentials or host address. Check that the username and password are correct, \
that the host is available and reachable, and that you have supplied the full scheme, \
domain, and port (e.g. https://myhost.code42.com:4285)"
except Exception:
return (
"Invalid credentials or host address. Check that the username and password are correct, that the host "
"is available and reachable, and that you have supplied the full scheme, domain, and port "
"(e.g. https://myhost.code42.com:4285)."
)


def main():
Expand Down