-
-
Notifications
You must be signed in to change notification settings - Fork 529
ailtyposquatting #2341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ailtyposquatting #2341
Changes from 11 commits
9e8f010
caca7c8
67f7a3a
19b4174
70ebe57
dfe0f4c
f42aadc
bf20a38
e867116
327c74e
0ed1ca7
903b2d6
6ec155f
48478c8
a32ad47
fe4269d
41703c8
6974eca
79443ce
8d29953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| from django.db import migrations | ||
| from django.db.models.fields.related_descriptors import ( | ||
| ForwardManyToOneDescriptor, | ||
| ForwardOneToOneDescriptor, | ||
| ManyToManyDescriptor, | ||
| ) | ||
|
|
||
| plugin = { | ||
| "python_module": { | ||
| "health_check_schedule": None, | ||
| "update_schedule": None, | ||
| "module": "ailtyposquatting.AilTypoSquatting", | ||
| "base_path": "api_app.analyzers_manager.observable_analyzers", | ||
| }, | ||
| "name": "AILTypoSquatting", | ||
| "description": "[AILTypoSquatting](https://github.com/typosquatter/ail-typo-squatting) is a Python library to generate list of potential typo squatting domains with domain name permutation engine to feed AIL and other systems.", | ||
| "disabled": False, | ||
| "soft_time_limit": 60, | ||
| "routing_key": "default", | ||
| "health_check_status": True, | ||
| "type": "observable", | ||
| "docker_based": False, | ||
| "maximum_tlp": "RED", | ||
| "observable_supported": ["domain"], | ||
| "supported_filetypes": [], | ||
| "run_hash": False, | ||
| "run_hash_type": "", | ||
| "not_supported_filetypes": [], | ||
| "model": "analyzers_manager.AnalyzerConfig", | ||
| } | ||
|
|
||
| params = [ | ||
| { | ||
| "python_module": { | ||
| "module": "ailtyposquatting.AilTypoSquatting", | ||
| "base_path": "api_app.analyzers_manager.observable_analyzers", | ||
| }, | ||
| "name": "dns_resolving", | ||
| "type": "bool", | ||
| "description": "dns_resolving for AilTypoSquatting; only works for TLP CLEAR", | ||
| "is_secret": False, | ||
| "required": False, | ||
| }, | ||
| ] | ||
| values = [ | ||
| { | ||
| "parameter": { | ||
| "python_module": { | ||
| "module": "ailtyposquatting.AilTypoSquatting", | ||
| "base_path": "api_app.analyzers_manager.observable_analyzers", | ||
| }, | ||
| "name": "dns_resolving", | ||
| "type": "bool", | ||
| "description": "dns_resolving for AilTypoSquatting; only works for TLP CLEAR", | ||
| "is_secret": False, | ||
| "required": False, | ||
| }, | ||
| "analyzer_config": "AILTypoSquatting", | ||
| "connector_config": None, | ||
| "visualizer_config": None, | ||
| "ingestor_config": None, | ||
| "pivot_config": None, | ||
| "for_organization": False, | ||
| "value": False, | ||
| "updated_at": "2024-05-26T00:10:15.236358Z", | ||
| "owner": None, | ||
| }, | ||
| ] | ||
|
|
||
|
|
||
| def _get_real_obj(Model, field, value): | ||
| def _get_obj(Model, other_model, value): | ||
| if isinstance(value, dict): | ||
| real_vals = {} | ||
| for key, real_val in value.items(): | ||
| real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
| value = other_model.objects.get_or_create(**real_vals)[0] | ||
| # it is just the primary key serialized | ||
| else: | ||
| if isinstance(value, int): | ||
| if Model.__name__ == "PluginConfig": | ||
| value = other_model.objects.get(name=plugin["name"]) | ||
| else: | ||
| value = other_model.objects.get(pk=value) | ||
| else: | ||
| value = other_model.objects.get(name=value) | ||
| return value | ||
|
|
||
| if ( | ||
| type(getattr(Model, field)) | ||
| in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] | ||
| and value | ||
| ): | ||
| other_model = getattr(Model, field).get_queryset().model | ||
| value = _get_obj(Model, other_model, value) | ||
| elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
| other_model = getattr(Model, field).rel.model | ||
| value = [_get_obj(Model, other_model, val) for val in value] | ||
| return value | ||
|
|
||
|
|
||
| def _create_object(Model, data): | ||
| mtm, no_mtm = {}, {} | ||
| for field, value in data.items(): | ||
| value = _get_real_obj(Model, field, value) | ||
| if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
| mtm[field] = value | ||
| else: | ||
| no_mtm[field] = value | ||
| try: | ||
| o = Model.objects.get(**no_mtm) | ||
| except Model.DoesNotExist: | ||
| o = Model(**no_mtm) | ||
| o.full_clean() | ||
| o.save() | ||
| for field, value in mtm.items(): | ||
| attribute = getattr(o, field) | ||
| if value is not None: | ||
| attribute.set(value) | ||
| return False | ||
| return True | ||
|
|
||
|
|
||
| def migrate(apps, schema_editor): | ||
| Parameter = apps.get_model("api_app", "Parameter") | ||
| PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| if not Model.objects.filter(name=plugin["name"]).exists(): | ||
| exists = _create_object(Model, plugin) | ||
| if not exists: | ||
| for param in params: | ||
| _create_object(Parameter, param) | ||
| for value in values: | ||
| _create_object(PluginConfig, value) | ||
|
|
||
|
|
||
| def reverse_migrate(apps, schema_editor): | ||
| python_path = plugin.pop("model") | ||
| Model = apps.get_model(*python_path.split(".")) | ||
| Model.objects.get(name=plugin["name"]).delete() | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| atomic = False | ||
| dependencies = [ | ||
| ("api_app", "0062_alter_parameter_python_module"), | ||
| ("analyzers_manager", "0091_analyzer_config_vulners"), | ||
| ] | ||
|
|
||
| operations = [migrations.RunPython(migrate, reverse_migrate)] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| import logging | ||
| import math | ||
|
|
||
| from ail_typo_squatting import runAll | ||
| from ail_typo_squatting.dns_local.resolving import dnsResolving | ||
|
|
||
| from api_app.analyzers_manager import classes | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class AilTypoSquatting(classes.ObservableAnalyzer): | ||
| """ | ||
| wrapper for https://github.com/typosquatter/ail-typo-squatting | ||
| """ | ||
|
|
||
| dns_resolving: bool = False | ||
|
|
||
| def update(self) -> bool: | ||
| pass | ||
|
|
||
| def run(self): | ||
| response = {} | ||
| logger.info( | ||
| f"""running AilTypoSquatting on {self.observable_name} | ||
| with tlp {self._job.tlp} | ||
| and dns resolving {self.dns_resolving}""" | ||
| ) | ||
| resultList = [] | ||
| response["algorithms"] = runAll( | ||
| domain=self.observable_name, | ||
| limit=math.inf, | ||
| formatoutput="yara", | ||
| pathOutput=None, | ||
| ) | ||
| if self._job.tlp == "CLEAR" and self.dns_resolving: | ||
|
||
| response["dnsResolving"] = dnsResolving( | ||
|
||
| resultList=resultList, domain=self.observable_name, pathOutput=None | ||
| ) | ||
|
|
||
| return response | ||
mlodic marked this conversation as resolved.
Show resolved
Hide resolved
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please show the results from this? cause "formatoutput"="yara" I don't think it is the right option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I mean it does not make sense to me cause I should have expected "Yara" code output. But ok, nevermind, the important thing is that it works. Maybe Yara would be used in the file written to the disk be but we are not generating it.
So, it could make sense to set it as None too to avoid confusion to the next reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
says :
known format: None. Will use text format insteadcommitting with
textonly.