-
-
Notifications
You must be signed in to change notification settings - Fork 529
fixes #1699 Permhash analyzer #2258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
25405b1
5aeb924
e4ae82d
58db127
72f93af
1f47136
48f4e5b
3575fc1
eb95b87
aef6de1
2941901
140617d
b00ee51
f0bf7c6
b9bf668
1f715c3
47348cd
f32b959
59cebe5
d0089c0
bee6b79
e48c16b
c453bac
b315469
6ac3370
0d2f470
3e81579
54d58eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
| # See the file 'LICENSE' for copying permission. | ||
|
|
||
| import logging | ||
| import os | ||
|
|
||
| from permhash import functions as permhash | ||
|
|
||
| from api_app.analyzers_manager.classes import FileAnalyzer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Permhash(FileAnalyzer): | ||
| """ | ||
| Create permissions hash of APK, Chrome extensions, | ||
| Android manifest and Chrome extension manifest files. | ||
| """ | ||
|
|
||
| def run(self): | ||
| result = {} | ||
| _, file_extension = os.path.splitext(self.filepath) | ||
|
|
||
| logger.info(f"Started PERMHASH============================> {self.filepath}") | ||
|
|
||
| file_extension = file_extension[1:] | ||
|
|
||
| hash_val = "" | ||
|
|
||
| if file_extension == "apk": | ||
| hash_val = permhash.permhash_apk(self.filepath) | ||
| elif file_extension == "xml": | ||
| hash_val = permhash.permhash_apk_manifest(self.filepath) | ||
| elif file_extension == "crx": | ||
| hash_val = permhash.permhash_crx(self.filepath) | ||
| elif file_extension == "json": | ||
| hash_val = permhash.permhash_crx_manifest(self.filepath) | ||
| else: | ||
| result["error"] = "Invalid file extension." | ||
|
||
|
|
||
| # permhash returns False if for some reason the hash value can't be found | ||
| if hash_val: | ||
| result["hash"] = hash_val | ||
| else: | ||
| result["error"] = "Could not find permissions in the file." | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you should raise the AnalyzerRunException
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not raise the exception here as it fails the test cases |
||
|
|
||
| return result | ||
|
|
||
| @classmethod | ||
| def update(cls) -> bool: | ||
| pass | ||
|
|
||
| @classmethod | ||
| def _monkeypatch(cls): | ||
| patches = [] | ||
|
|
||
| return super()._monkeypatch(patches=patches) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| from django.db import migrations | ||
| from django.db.models.fields.related_descriptors import ( | ||
| ForwardManyToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ManyToManyDescriptor, | ||
| ) | ||
|
|
||
| plugin = { | ||
| "python_module": { | ||
| "health_check_schedule": None, | ||
| "update_schedule": None, | ||
| "module": "perm_hash.Permhash", | ||
| "base_path": "api_app.analyzers_manager.file_analyzers", | ||
| }, | ||
| "name": "Permhash", | ||
| "description": "create a hash of permissions in APK, Android manifest, chrome extension, chrome extension manifest files using [permhash](https://github.com/google/permhash).", | ||
| "disabled": False, | ||
| "soft_time_limit": 20, | ||
| "routing_key": "default", | ||
| "health_check_status": True, | ||
| "type": "file", | ||
| "docker_based": False, | ||
| "maximum_tlp": "RED", | ||
| "observable_supported": [], | ||
| "supported_filetypes": [ | ||
| "application/vnd.android.package-archive", | ||
| "application/zip", | ||
| "application/java-archive", | ||
| "application/octet-stream", | ||
| "text/plain", | ||
| "application/x-chrome-extension", | ||
| "application/json", | ||
| ], | ||
| "run_hash": False, | ||
| "run_hash_type": "", | ||
| "not_supported_filetypes": [], | ||
| "model": "analyzers_manager.AnalyzerConfig", | ||
| } | ||
|
|
||
| params = [] | ||
|
|
||
| values = [] | ||
|
|
||
|
|
||
| def _get_real_obj(Model, field, value): | ||
| def _get_obj(Model, other_model, value): | ||
| if isinstance(value, dict): | ||
| real_vals = {} | ||
| for key, real_val in value.items(): | ||
| real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
| value = other_model.objects.get_or_create(**real_vals)[0] | ||
| # it is just the primary key serialized | ||
| else: | ||
| if isinstance(value, int): | ||
| if Model.__name__ == "PluginConfig": | ||
| value = other_model.objects.get(name=plugin["name"]) | ||
| else: | ||
| value = other_model.objects.get(pk=value) | ||
| else: | ||
| value = other_model.objects.get(name=value) | ||
| return value | ||
|
|
||
| if ( | ||
| type(getattr(Model, field)) | ||
| in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] | ||
| and value | ||
| ): | ||
| other_model = getattr(Model, field).get_queryset().model | ||
| value = _get_obj(Model, other_model, value) | ||
| elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
| other_model = getattr(Model, field).rel.model | ||
| value = [_get_obj(Model, other_model, val) for val in value] | ||
| return value | ||
|
|
||
|
|
||
| def _create_object(Model, data): | ||
| mtm, no_mtm = {}, {} | ||
| for field, value in data.items(): | ||
| value = _get_real_obj(Model, field, value) | ||
| if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
| mtm[field] = value | ||
| else: | ||
| no_mtm[field] = value | ||
| try: | ||
| o = Model.objects.get(**no_mtm) | ||
| except Model.DoesNotExist: | ||
| o = Model(**no_mtm) | ||
| o.full_clean() | ||
| o.save() | ||
| for field, value in mtm.items(): | ||
| attribute = getattr(o, field) | ||
| if value is not None: | ||
| attribute.set(value) | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| if not Model.objects.filter(name=plugin["name"]).exists(): | ||
| exists = _create_object(Model, plugin) | ||
| if not exists: | ||
| for param in params: | ||
| _create_object(Parameter, param) | ||
| for value in values: | ||
| _create_object(PluginConfig, value) | ||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| Model.objects.get(name=plugin["name"]).delete() | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| atomic = False | ||
| dependencies = [ | ||
| ("api_app", "0062_alter_parameter_python_module"), | ||
| ("analyzers_manager", "0077_analyzer_config_abusix"), | ||
| ] | ||
|
|
||
| operations = [migrations.RunPython(migrate, reverse_migrate)] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,8 @@ class MimeTypes(models.TextChoices): | |
| XPKCS7 = "application/x-pkcs7-signature" | ||
| MIXED = "multipart/mixed" | ||
| X_SHELLSCRIPT = "text/x-shellscript" | ||
| CRX = "application/x-chrome-extension" | ||
| JSON = "application/json" | ||
|
|
||
|
Comment on lines
+87
to
89
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are changing a model, you need to do a migration for this change too |
||
| @classmethod | ||
| def _calculate_from_filename(cls, file_name: str) -> Optional["MimeTypes"]: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why check the file extension when you can use the mimetype?