Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": {
"minute": "0",
"hour": "0",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
"module": "spamhaus_drop.SpamhausDropV4",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "Spamhaus_DROP",
"description": "[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks.",
"disabled": False,
"soft_time_limit": 10,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["generic"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
(
"analyzers_manager",
"0105_alter_analyzerconfig_not_supported_filetypes_and_more",
),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
101 changes: 101 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/spamhaus_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import json
import logging
import os
import re

import requests
from django.conf import settings

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

logger = logging.getLogger(__name__)


class SpamhausDropV4(classes.ObservableAnalyzer):
url = "https://www.spamhaus.org/drop/drop_v4.json"

@classmethod
def location(cls) -> str:
db_name = "drop_v4.json"
return f"{settings.MEDIA_ROOT}/{db_name}"

@staticmethod
def is_valid_cidr(cidr) -> bool:
cidr_pattern = r"^([0-9]{1,3}\.){3}[0-9]{1,3}/(0|[1-2][0-9]|3[0-2])$"
return re.match(cidr_pattern, cidr) is not None

def run(self):
if not self.is_valid_cidr(self.observable_name):
return {"not_supported": "not a valid CIDR"}
database_location = self.location()
if not os.path.exists(database_location):
logger.info(
f"Database does not exist in {database_location}, initialising..."
)
self.update()
with open(database_location, "r") as f:
db = json.load(f)
for i in db:
if i["cidr"] == self.observable_name:
return {"found": True}
return {"found": False}

@classmethod
def update(cls):
logger.info(f"Updating database from {cls.url}")
response = requests.get(url=cls.url)
response.raise_for_status()
data = cls.convert_to_json(response.text)
database_location = cls.location()

with open(database_location, "w", encoding="utf-8") as f:
json.dump(data, f)
logger.info(f"Database updated at {database_location}")

@staticmethod
def convert_to_json(input_string) -> dict:
lines = input_string.strip().split("\n")
json_objects = []
for line in lines:
try:
json_obj = json.loads(line)
json_objects.append(json_obj)
except json.JSONDecodeError:
raise AnalyzerRunException(
"Invalid JSON format in the response while updating the database"
)

return json_objects

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse(
[
{
"cidr": "1.10.16.0/20",
"sblid": "SBL256894",
"rir": "apnic",
},
{
"cidr": "1.19.0.0/16",
"sblid": "SBL434604",
"rir": "apnic",
},
{
"cidr": "1.32.128.0/18",
"sblid": "SBL286275",
"rir": "apnic",
},
],
200,
),
),
)
]
return super()._monkeypatch(patches=patches)
5 changes: 3 additions & 2 deletions docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,10 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library.
* `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain.
* `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol.
* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
* `Adguard`:[Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
* `Adguard`: [Adguard](https://github.com/AdguardTeam/AdguardSDNSFilter), a filter composed of several other filters (AdGuard Base filter, Social media filter, Tracking Protection filter, Mobile Ads filter, EasyList and EasyPrivacy) and simplified specifically to be better compatible with DNS-level ad blocking.
* `JA4_DB`:[JA4_DB](https://ja4db.com/) lets you search a fingerprint in JA4 databse.

* `Spamhaus_DROP`:[Spamhaus_DROP](https://www.spamhaus.org/blocklists/do-not-route-or-peer/) protects from activity directly originating from rogue networks, such as spam campaigns, encryption via ransomware, DNS-hijacking and exploit attempts, authentication attacks to discover working access credentials, harvesting, DDoS attacks.

##### Generic analyzers (email, phone number, etc.; anything really)

Expand Down
8 changes: 5 additions & 3 deletions intel_owl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,11 @@ def send_bi_to_elastic(max_timeout: int = 60, max_objects: int = 10000):
VisualizerReport,
]:
report_class: typing.Type[AbstractReport]
report_class.objects.filter(sent_to_bi=False).filter_completed().defer("report").order_by(
"-start_time"
)[:max_objects].send_to_elastic_as_bi(max_timeout=max_timeout)
report_class.objects.filter(sent_to_bi=False).filter_completed().defer(
"report"
).order_by("-start_time")[:max_objects].send_to_elastic_as_bi(
max_timeout=max_timeout
)
Job.objects.filter(sent_to_bi=False).filter_completed().order_by(
"-received_request_time"
)[:max_objects].send_to_elastic_as_bi(max_timeout=max_timeout)
Expand Down