Skip to content
128 changes: 128 additions & 0 deletions api_app/analyzers_manager/file_analyzers/mobsf_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging

import requests

from api_app.analyzers_manager.classes import FileAnalyzer
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

logger = logging.getLogger(__name__)


class MobSF_Service(FileAnalyzer):
mobsf_host: str
_mobsf_api_key: str

def update(self) -> bool:
pass

def static_analysis(self, scan_hash, headers):

scan_url = self.mobsf_host + "/api/v1/scan"
data = {"hash": scan_hash}
scan_response = requests.post(url=scan_url, data=data, headers=headers)
scan_response.raise_for_status()

report_url = self.mobsf_host + "/api/v1/report_json"
report_response = requests.post(url=report_url, data=data, headers=headers)
report_response.raise_for_status()
return report_response.json()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about the static analysis I am worried about execution time. By default, requests does not set any timeout. This is potentially dangerous in IntelOwl because it could lead to jobs that never end, creating bottlenecks in the overall application.
IntelOwl cannot rely on external tools. This is the reason why we need to explicity set timeouts to all the requests calls that you made in this analyzer. Which value? What your experience with this tool tells you? I think that static analysis could last several seconds. 30 seconds as default could make sense?
Plus, this value should be a parameter of the analyzer and customized so the users are not bound to it.

Copy link
Member Author

@spoiicy spoiicy Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though i've not used mobsf extensively but the apps i've tested did not take more than 30 seconds for static analysis. I understand it will be better if we set a parameter for this so that the end-user can modify it according to their needs.

But I wan't to understand, do we need to set this timeout because when I was implementing this analyzer, it was failing at the "dynamic analysis" stage with an error "SoftTimeLimitExceeded". So I had increased the Soft-time limit in the analyzer config to 400 in order for it to succeed otherwise below 250, if I remember correctly, my job was failing.

Even in the worst-case scenario, where our mobsf doesn't respond to the request and the request keeps hanging, wouldn't celery's Soft time limit take care of this and would end the job after specific period of time?

I do understand that setting a default timeout would be better for error handling but wouldn't just configuring the soft-time limit take care of everything. Let me know what you think!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in the worst-case scenario, where our mobsf doesn't respond to the request and the request keeps hanging, wouldn't celery's Soft time limit take care of this and would end the job after specific period of time?

Yes but that would be the last resort and should be treated as an unmanaged exception. It's like creating a generic Exception in python code. Ok, you get when something bad happens but you did not manage it properly cause you don't know what exactly was happening. Pretty much the same thing here. Soft time limit should be treated as the emergency exception.

About adding the timeout, we already did that for other similar services if you check other analyzers like this. High soft time limits and customizable timeouts, with low timeout values by default.

Regarding the values that you proposed, I think that even more than 400 could be set, maybe 1000 to handle the worst cases scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely get your point about the unmanaged exception. I'll provide an option of request timeout parameter with a reasonable default value which can be modified as per user's need.

I just want to understand one last thing, let's say a mobsf job has been triggered where default soft-time limit is 60 seconds but now the user has set the request timeout value as 400 seconds. Would this 400 seconds timeout be respected or the job will fail with error "SoftTimeLimitExceeded".

Because from my limited understanding of the Intelowl, if the alloted time for a job to run is set to 60 seconds, the job will end even if we set the request timeout higher than the soft-time limit. So along with setting the request timeout, we'll have to configure the soft-time limit to a decent time period so that the job succeeds.

Let me know if my understanding is correct/incorrect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to understand one last thing, let's say a mobsf job has been triggered where default soft-time limit is 60 seconds but now the user has set the request timeout value as 400 seconds. Would this 400 seconds timeout be respected or the job will fail with error "SoftTimeLimitExceeded".

the job would fail with SoftTimeLimitExceeded.

we'll have to configure the soft-time limit to a decent time period so that the job succeeds.

Exactly. In this way soft time limits exception are limited to unwanted exceptions. I understand that the timeout can be changed by the user in a way that the soft time limit could always be reached but that's something we can ignore to avoid over-engineering this

def dynamic_analysis(self, scan_hash, headers):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here please

mobsfy_runtime_response = requests.post(
url=self.mobsf_host + "/api/v1/android/mobsfy",
headers=headers,
data={"identifier": "127.0.0.1:6555"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a parameter cause it can change between environments

)
mobsfy_runtime_response.raise_for_status()

start_dynamic_analysis_response = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/start_analysis",
headers=headers,
data={"hash": scan_hash},
)
start_dynamic_analysis_response.raise_for_status()

tls_tests_response = requests.post(
url=self.mobsf_host + "/api/v1/android/tls_tests",
headers=headers,
data={"hash": scan_hash},
)
tls_tests_response.raise_for_status()

start_frida_instrumentation_response = requests.post(
url=self.mobsf_host + "/api/v1/frida/instrument",
headers=headers,
data={
"hash": scan_hash,
"default_hooks": "api_monitor,ssl_pinning_bypass,root_bypass,debugger_check_bypass",
"auxiliary_hooks": "",
"frida_code": "",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 3 values should be parameters of the analyzer. In that way the users can choose how to leverage dynamic analysis as they wish

},
)
start_frida_instrumentation_response.raise_for_status()

get_runtime_dependency_response = requests.post(
url=self.mobsf_host + "/api/v1/frida/get_dependencies",
headers=headers,
data={"hash": scan_hash},
)
get_runtime_dependency_response.raise_for_status()

stop_dynamic_analysis = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/stop_analysis",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you suggested, dynamic analysis time is debatable. This is the reason why I think we should add here a sleep and that this should be a parameter of the analyzer. Which default? 20-30 seconds? What do you think about?

Copy link
Member Author

@spoiicy spoiicy Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented here, wouldn't just configuring the soft-time limit do the trick. Because if I keep it at default value 60, "SoftTimeLimitExceeded" error is thrown and my job fails. I had to set it to 400 in order for my job to be successful. Let me know what you think!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see previous comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that no sleep was added so basically the analysis has a really brief duration time, just the time needed to execute all the steps between the calls "start" and "stop". As mentioned, I would add a sleep between the "start" and the later calls, to allow time for the stuff to start properly and the code loaded and maybe to allow the malware to do some nasty things too.

headers=headers,
data={"hash": scan_hash},
)
stop_dynamic_analysis.raise_for_status()

dynamic_analysis_report = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/report_json",
headers=headers,
data={"hash": scan_hash},
)
dynamic_analysis_report.raise_for_status()

return dynamic_analysis_report.json()

def run(self):
headers = {"X-Mobsf-Api-Key": self._mobsf_api_key}
binary = self.read_file_bytes()

upload_url = self.mobsf_host + "/api/v1/upload"
upload_response = requests.post(
url=upload_url,
files={"file": (self.filename, binary, "application/octet-stream")},
headers=headers,
)
upload_response.raise_for_status()
scan_hash = upload_response.json()["hash"]

static_analysis_json = self.static_analysis(scan_hash, headers)
dynamic_analysis_json = self.dynamic_analysis(scan_hash, headers)

results = {
"static_analysis_results": static_analysis_json,
"dynamic_analysis_results": dynamic_analysis_json,
}

return results

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.post",
return_value=MockUpResponse(
{
"file_name": "diva-beta.apk",
"hash": "82ab8b2193b3cfb1c737e3a786be363a",
"scan_type": "apk",
},
200,
),
),
)
]
return super()._monkeypatch(patches=patches)
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "MobSF_Service",
"description": "[MobSF_Service](https://github.com/MobSF/Mobile-Security-Framework-MobSF) can be used for a variety of use cases such as mobile application security, penetration testing, malware analysis, and privacy analysis.",
"disabled": False,
"soft_time_limit": 400,
"routing_key": "default",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "RED",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if you should have control of the MOBSF Instance, it is still an external service so by definition this should be AMBER

"observable_supported": [],
"supported_filetypes": [
"application/vnd.android.package-archive",
"application/zip",
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use this list for android:

application/java-archive
application/vnd.android.package-archive
application/x-dex
application/zip

"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "mobsf_api_key",
"type": "str",
"description": "MobSF API Key",
"is_secret": True,
"required": True,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "mobsf_host",
"type": "str",
"description": "The IP address where your mobsf is hosted.",
"is_secret": False,
"required": True,
},
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0064_vt_sample_download"),
("analyzers_manager", "0138_alter_analyzerreport_data_model_content_type"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Loading