Skip to content
166 changes: 166 additions & 0 deletions api_app/analyzers_manager/file_analyzers/mobsf_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging

import requests

from api_app.analyzers_manager.classes import FileAnalyzer
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

logger = logging.getLogger(__name__)


class MobSF_Service(FileAnalyzer):
mobsf_host: str
identifier: str
timeout: int = 30
enable_dynamic_analysis: bool = False
_mobsf_api_key: str
default_hooks: str = "root_bypass"
auxiliary_hooks: str = ""
frida_code: str = ""

def update(self) -> bool:
pass

def static_analysis(self, scan_hash, headers):
logger.info(f"Initiating static analysis with scan hash: {scan_hash}")
scan_url = self.mobsf_host + "/api/v1/scan"
data = {"hash": scan_hash}
scan_response = requests.post(
url=scan_url, data=data, headers=headers, timeout=self.timeout
)
scan_response.raise_for_status()
logger.info("Static analysis completed successfully")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bothering about logs because they are really critical in terms of maintenance and debugging so I have other tweaks to ask you :). Here, there is a log without any dynamic variable. It would be very difficult to use in practice because you cannot distinguish it between other logs from other runs. Always add a reference to the file hash.


logger.info("Generating JSON Report for static analysis")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can merge this log with the previous one because they are one next to other so they make sense to be the same log

report_url = self.mobsf_host + "/api/v1/report_json"
report_response = requests.post(
url=report_url, data=data, headers=headers, timeout=self.timeout
)
report_response.raise_for_status()
return report_response.json()

def dynamic_analysis(self, scan_hash, headers):
logger.info(
f"Configuring runtime environment with android instance identifier: {self.identifier}"
)
mobsfy_runtime_response = requests.post(
url=self.mobsf_host + "/api/v1/android/mobsfy",
headers=headers,
data={"identifier": self.identifier},
timeout=self.timeout,
)
mobsfy_runtime_response.raise_for_status()

logger.info(f"Initiating dynamic analysis for scan hash: {scan_hash}")
start_dynamic_analysis_response = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/start_analysis",
headers=headers,
data={"hash": scan_hash},
timeout=self.timeout,
)
start_dynamic_analysis_response.raise_for_status()
logger.info("Dynamic analyzer started successfully")

logger.info("Running tls tests")
tls_tests_response = requests.post(
url=self.mobsf_host + "/api/v1/android/tls_tests",
headers=headers,
data={"hash": scan_hash},
timeout=self.timeout,
)
tls_tests_response.raise_for_status()

logger.info("Starting frida instrumentation with user provided hooks and code")
start_frida_instrumentation_response = requests.post(
url=self.mobsf_host + "/api/v1/frida/instrument",
headers=headers,
data={
"hash": scan_hash,
"default_hooks": self.default_hooks,
"auxiliary_hooks": self.auxiliary_hooks,
"frida_code": self.frida_code,
},
timeout=self.timeout,
)
start_frida_instrumentation_response.raise_for_status()
logger.info("Frida instrumentation started successfully")

logger.info("Collecting runtime dependencies")
get_runtime_dependency_response = requests.post(
url=self.mobsf_host + "/api/v1/frida/get_dependencies",
headers=headers,
data={"hash": scan_hash},
timeout=self.timeout,
)
get_runtime_dependency_response.raise_for_status()
logger.info("Successfully collected runtime dependencies")

logger.info(
f"Stopping dyanmic analyzer and generating JSON report for scan hash: {scan_hash}"
)
stop_dynamic_analysis = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/stop_analysis",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you suggested, dynamic analysis time is debatable. This is the reason why I think we should add here a sleep and that this should be a parameter of the analyzer. Which default? 20-30 seconds? What do you think about?

Copy link
Member Author

@spoiicy spoiicy Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented here, wouldn't just configuring the soft-time limit do the trick. Because if I keep it at default value 60, "SoftTimeLimitExceeded" error is thrown and my job fails. I had to set it to 400 in order for my job to be successful. Let me know what you think!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see previous comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that no sleep was added so basically the analysis has a really brief duration time, just the time needed to execute all the steps between the calls "start" and "stop". As mentioned, I would add a sleep between the "start" and the later calls, to allow time for the stuff to start properly and the code loaded and maybe to allow the malware to do some nasty things too.

headers=headers,
data={"hash": scan_hash},
timeout=self.timeout,
)
stop_dynamic_analysis.raise_for_status()

dynamic_analysis_report = requests.post(
url=self.mobsf_host + "/api/v1/dynamic/report_json",
headers=headers,
data={"hash": scan_hash},
timeout=self.timeout,
)
dynamic_analysis_report.raise_for_status()
logger.info("JSON report for dynamic analysis generated successfully")

return dynamic_analysis_report.json()

def run(self):
headers = {"X-Mobsf-Api-Key": self._mobsf_api_key}
binary = self.read_file_bytes()
logger.info("File bytes read successfully. Initiating upload request")

upload_url = self.mobsf_host + "/api/v1/upload"
upload_response = requests.post(
url=upload_url,
files={"file": (self.filename, binary, "application/octet-stream")},
headers=headers,
timeout=self.timeout,
)
upload_response.raise_for_status()
scan_hash = upload_response.json()["hash"]
logger.info(f"File upload successful with scan hash: {scan_hash}")

static_analysis_json = self.static_analysis(scan_hash, headers)
dynamic_analysis_json = (
self.dynamic_analysis(scan_hash, headers)
if self.enable_dynamic_analysis
else {}
)
results = {
"static_analysis_results": static_analysis_json,
"dynamic_analysis_results": dynamic_analysis_json,
}

return results

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.post",
return_value=MockUpResponse(
{
"file_name": "diva-beta.apk",
"hash": "82ab8b2193b3cfb1c737e3a786be363a",
"scan_type": "apk",
},
200,
),
),
)
]
return super()._monkeypatch(patches=patches)
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "MobSF_Service",
"description": "[MobSF_service](https://github.com/MobSF/Mobile-Security-Framework-MobSF) can be used for a variety of use cases such as mobile application security, penetration testing, malware analysis, and privacy analysis.",
"disabled": False,
"soft_time_limit": 400,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned in a previous comment, you can extend it to 1000 to allow for broader use cases

"routing_key": "default",
"health_check_status": True,
"type": "file",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": [],
"supported_filetypes": [
"application/vnd.android.package-archive",
"application/x-dex",
"application/zip",
"application/java-archive",
],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"mapping_data_model": {},
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "mobsf_host",
"type": "str",
"description": "IP address where mobsf is hosted",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "identifier",
"type": "str",
"description": "Android instance identifier",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "timeout",
"type": "int",
"description": "Request timeout for each API call. Default value is 30 seconds",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "enable_dynamic_analysis",
"type": "bool",
"description": "Set to true to enable dynamic analyzer",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "default_hooks",
"type": "str",
"description": "Comma seperated values for default Frida scripts",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "auxiliary_hooks",
"type": "str",
"description": "Comma seperated values for auxiliary Frida scripts",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "frida_code",
"type": "str",
"description": "Frida code to load",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "mobsf_service.MobSF_Service",
"base_path": "api_app.analyzers_manager.file_analyzers",
},
"name": "mobsf_api_key",
"type": "str",
"description": "MobSF API key",
"is_secret": True,
"required": True,
},
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0064_vt_sample_download"),
("analyzers_manager", "0140_analyzerreport_analyzers_m_data_mo_a1952b_idx"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Loading