Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "AILTypoSquatting",
"description": "[AILTypoSquatting](https://github.com/typosquatter/ail-typo-squatting) is a Python library to generate list of potential typo squatting domains with domain name permutation engine to feed AIL and other systems.",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "CLEAR",
"observable_supported": ["domain"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "omission",
"type": "bool",
"description": "omission for AilTypoSquatting",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "subdomain",
"type": "bool",
"description": "subdomain for AilTypoSquatting",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "addDash",
"type": "bool",
"description": "addDash for AilTypoSquatting",
"is_secret": False,
"required": False,
},
{
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "runall",
"type": "bool",
"description": "runall for AilTypoSquatting",
"is_secret": False,
"required": False,
},
]

values = [
{
"parameter": {
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "omission",
"type": "bool",
"description": "omission for AilTypoSquatting",
"is_secret": False,
"required": False,
},
"analyzer_config": "AILTypoSquatting",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": False,
"updated_at": "2024-05-26T00:10:15.236358Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "subdomain",
"type": "bool",
"description": "subdomain for AilTypoSquatting",
"is_secret": False,
"required": False,
},
"analyzer_config": "AILTypoSquatting",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": False,
"updated_at": "2024-05-26T00:10:15.239951Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "addDash",
"type": "bool",
"description": "addDash for AilTypoSquatting",
"is_secret": False,
"required": False,
},
"analyzer_config": "AILTypoSquatting",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": False,
"updated_at": "2024-05-26T00:10:15.242347Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "ailtyposquatting.AilTypoSquatting",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "runall",
"type": "bool",
"description": "runall for AilTypoSquatting",
"is_secret": False,
"required": False,
},
"analyzer_config": "AILTypoSquatting",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": True,
"updated_at": "2024-05-26T00:10:15.166020Z",
"owner": None,
},
]


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0090_analyzer_config_cycat"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
58 changes: 58 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/ailtyposquatting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import math

from ail_typo_squatting import addDash, omission, subdomain

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerRunException

logger = logging.getLogger(__name__)


class AilTypoSquatting(classes.ObservableAnalyzer):
"""
wrapper for https://github.com/typosquatter/ail-typo-squatting
"""

omission: bool = False
subdomain: bool = False
addDash: bool = False
runall: bool = True

def update(self) -> bool:
pass

def run(self):
resultList = []
response = {}
limit = math.inf
if self.runall or self.omission:
response["omission"] = omission(
domain=self.observable_name,
resultList=resultList,
verbose=False,
limit=limit,
givevariations=False,
keeporiginal=False,
)
if self.runall or self.subdomain:
response["subdomain"] = subdomain(
domain=self.observable_name,
resultList=resultList,
verbose=False,
limit=limit,
givevariations=False,
keeporiginal=False,
)
if self.runall or self.addDash:
response["addDash"] = addDash(
domain=self.observable_name,
resultList=resultList,
verbose=False,
limit=limit,
givevariations=False,
keeporiginal=False,
)
else:
raise AnalyzerRunException("No algo selected to run")
return response
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="AILTypoSquatting").id)
pc.full_clean()
pc.save()

pc = playbook_config.objects.get(name="Sample_Static_Analysis")
pc.analyzers.add(AnalyzerConfig.objects.get(name="AILTypoSquatting").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="AILTypoSquatting").id)
pc.full_clean()
pc.save()

pc = playbook_config.objects.get(name="Sample_Static_Analysis")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="AILTypoSquatting").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
(
"playbooks_manager",
"0044_add_cycat_to_free_to_use",
),
("analyzers_manager", "0091_analyzer_config_ailtyposquatting"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
3 changes: 2 additions & 1 deletion docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `TweetFeed`: [TweetFeed](https://tweetfeed.live/) collects Indicators of Compromise (IOCs) shared by the infosec community at Twitter.\r\nHere you will find malicious URLs, domains, IPs, and SHA256/MD5 hashes.
* `HudsonRock`: [Hudson Rock](https://cavalier.hudsonrock.com/docs) provides its clients the ability to query a database of over 27,541,128 computers which were compromised through global info-stealer campaigns performed by threat actors.
* `CyCat`: [CyCat](https://cycat.org/) or the CYbersecurity Resource CATalogue aims at mapping and documenting, in a single formalism and catalogue available cybersecurity tools, rules, playbooks, processes and controls.

* `AILTypoSquatting`:[AILTypoSquatting](https://github.com/typosquatter/ail-typo-squatting) is a Python library to generate list of potential typo squatting domains with domain name permutation engine to feed AIL and other systems.

##### Generic analyzers (email, phone number, etc.; anything really)

Some analyzers require details other than just IP, URL, Domain, etc. We classified them as `generic` Analyzers. Since the type of field is not known, there is a format for strings to be followed.
Expand Down
2 changes: 1 addition & 1 deletion requirements/project-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ querycontacts==2.0.0
blint==2.1.5
hfinger==0.2.2
permhash==0.1.4

ail_typo_squatting==2.7.4
# this is required because XLMMacroDeobfuscator does not pin the following packages
pyxlsb2==0.0.8
xlrd2==1.3.4
Expand Down