Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ compose-elk.yml
.python_history
.viminfo

# certs
certs/

# docs
docs_env/
Expand Down
12 changes: 10 additions & 2 deletions api_app/documents.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import logging

from django_elasticsearch_dsl import Document, fields
from django_elasticsearch_dsl.registries import registry

from .models import Job

logger = logging.getLogger(__name__)


@registry.register_document
@registry.register_document # TODO: maybe we can replace this with the signal and remove django elasticsearch dsl
class JobDocument(Document):
# Object/List fields
analyzers_to_execute = fields.NestedField(
Expand All @@ -19,7 +23,11 @@ class JobDocument(Document):
visualizers_to_execute = fields.NestedField(
properties={"name": fields.KeywordField()}
)
playbook_to_execute = fields.KeywordField()
playbook_to_execute = fields.ObjectField(
properties={
"name": fields.KeywordField(),
},
)

# Normal fields
errors = fields.TextField()
Expand Down
39 changes: 39 additions & 0 deletions api_app/management/commands/elastic_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import json
import logging

from django.conf import settings
from django.core.management import BaseCommand
from elasticsearch import ApiError
from elasticsearch_dsl import connections

logger = logging.getLogger(__name__)


class Command(BaseCommand):
# NOTE: this command is runned by uwsgi startup script

help = "Create or update the index templates in Elasticsearch"

def handle(self, *args, **options):
if settings.ELASTIC_HOST:
self.stdout.write("Creating/updating the templates...")
# push template
with open(
settings.CONFIG_ROOT / "elastic_search_mappings" / "plugin_report.json"
) as file_content:
try:
connections.get_connection().indices.put_template(
name="plugin-report", body=json.load(file_content)
)
success_msg = (
"created/updated Elasticsearch's template for plugin-report"
)
self.stdout.write(self.style.SUCCESS(success_msg))
logger.info(success_msg)
except ApiError as error:
self.stdout.write(self.style.ERROR(error))
logger.critical(error)
else:
self.stdout.write(
self.style.WARNING("Elasticsearch not active, templates not updated")
)
4 changes: 2 additions & 2 deletions api_app/queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _create_index_template():
) as f:
body = json.load(f)
body["index_patterns"] = [f"{settings.ELASTICSEARCH_BI_INDEX}-*"]
settings.ELASTICSEARCH_CLIENT.indices.put_template(
settings.ELASTICSEARCH_BI_CLIENT.indices.put_template(
name=settings.ELASTICSEARCH_BI_INDEX, body=body
)
logger.info(
Expand Down Expand Up @@ -105,7 +105,7 @@ def send_to_elastic_as_bi(self, max_timeout: int = 60) -> bool:
serializer = self._get_bi_serializer_class()(instance=objects, many=True)
objects_serialized = serializer.data
_, errors = bulk(
settings.ELASTICSEARCH_CLIENT,
settings.ELASTICSEARCH_BI_CLIENT,
objects_serialized,
request_timeout=max_timeout,
)
Expand Down
50 changes: 50 additions & 0 deletions configuration/elastic_search_mappings/plugin_report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"index_patterns": [
"plugin-report-*"
],
"settings" : {
"number_of_shards" : 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"config": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
},
"job": {
"properties": {
"id": {
"type": "long"
}
}
},
"start_time": {
"type": "date"
},
"end_time": {
"type": "date"
},
"status": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"report": {
"type": "flattened"
}
}
}
}
20 changes: 20 additions & 0 deletions create_elastic_certs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash
if [ ! -f ./certs/elastic_ca/ca.crt ] && [ ! -f ./certs/elastic_ca/ca.key ] && [ ! -f ./certs/elastic_instance/instance.crt ] && [ ! -f ./certs/elastic_instance/instance.key ]; then
# start container
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.15.0 &&
docker run -d --name elasticsearch_cert -v ./elasticsearch_instances.yml:/usr/share/elasticsearch/elasticsearch_instances.yml -it docker.elastic.co/elasticsearch/elasticsearch:8.15.0 &&
# generate ca
docker exec -ti elasticsearch_cert ./bin/elasticsearch-certutil ca --pem --out ca.zip &&
docker exec -ti elasticsearch_cert unzip ca.zip &&
# generate cert signed with the ca previously generate
docker exec -ti elasticsearch_cert ./bin/elasticsearch-certutil cert --in /usr/share/elasticsearch/elasticsearch_instances.yml --pem --ca-cert ./ca/ca.crt --ca-key ./ca/ca.key --silent --out cert.zip &&
docker exec -ti elasticsearch_cert unzip cert.zip &&
# extract files from the container
docker cp elasticsearch_cert:/usr/share/elasticsearch/ca ./certs/elastic_ca &&
docker cp elasticsearch_cert:/usr/share/elasticsearch/elasticsearch ./certs/elastic_instance &&
# down container
docker kill elasticsearch_cert &&
docker rm elasticsearch_cert
else
echo "files already exists"
fi
33 changes: 22 additions & 11 deletions docker/elasticsearch.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ services:
uwsgi:
depends_on:
- elasticsearch
volumes:
- ../certs:/opt/deploy/intel_owl/certs

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.0
container_name: intelowl_elasticsearch
healthcheck:
test: ["CMD-SHELL", "nc -z localhost 9200 || exit 1"]
interval: 5s
timeout: 2s
start_period: 2s
retries: 6
env_file:
- env_file_app
volumes:
- elastic_data:/usr/share/elasticsearch/data
- ../certs:/usr/share/elasticsearch/config/certificates
environment:
- "discovery.type=single-node"

kibana:
image: docker.elastic.co/kibana/kibana:7.17.0
environment:
ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'
ports:
- '5601:5601'
depends_on:
- elasticsearch
- discovery.type=single-node
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=/usr/share/elasticsearch/config/certificates/elastic_instance/elasticsearch.key
- xpack.security.http.ssl.certificate_authorities=/usr/share/elasticsearch/config/certificates/elastic_ca/ca.crt
- xpack.security.http.ssl.certificate=/usr/share/elasticsearch/config/certificates/elastic_instance/elasticsearch.crt

volumes:
elastic_data:
3 changes: 3 additions & 0 deletions docker/entrypoints/uwsgi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ echo "DEBUG: " $DEBUG
echo "DJANGO_TEST_SERVER: " $DJANGO_TEST_SERVER
echo "------------------------------"
CHANGELOG_NOTIFICATION_COMMAND='python manage.py changelog_notification .github/CHANGELOG.md INTELOWL --number-of-releases 3'
ELASTIC_TEMPLATE_COMMAND='python manage.py elastic_templates'

if [[ $DEBUG == "True" ]] && [[ $DJANGO_TEST_SERVER == "True" ]];
then
Expand All @@ -43,8 +44,10 @@ then
fi

$CHANGELOG_NOTIFICATION_COMMAND --debug
$ELASTIC_TEMPLATE_COMMAND
python manage.py runserver 0.0.0.0:8001
else
$CHANGELOG_NOTIFICATION_COMMAND
$ELASTIC_TEMPLATE_COMMAND
/usr/local/bin/uwsgi --ini /etc/uwsgi/sites/intel_owl.ini --stats 127.0.0.1:1717 --stats-http
fi
4 changes: 2 additions & 2 deletions docker/env_file_app_template
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ SLACK_TOKEN=
DEFAULT_SLACK_CHANNEL=

# Elastic Search Configuration
ELASTICSEARCH_DSL_ENABLED=False
ELASTICSEARCH_DSL_HOST=
ELASTIC_HOST=
ELASTIC_PASSWORD=
# consult to: https://django-elasticsearch-dsl.readthedocs.io/en/latest/settings.html
ELASTICSEARCH_DSL_NO_OF_SHARDS=1
ELASTICSEARCH_DSL_NO_OF_REPLICAS=0
Expand Down
2 changes: 2 additions & 0 deletions elasticsearch_instances.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
instances:
- name: elasticsearch
8 changes: 8 additions & 0 deletions intel_owl/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ def get_queue_name(queue: str) -> str:
"MessageGroupId": str(uuid.uuid4()),
},
},
"send_plugin_report_to_elastic": {
"task": "send_plugin_report_to_elastic",
"schedule": crontab(minute="*/5"),
"options": {
"queue": get_queue_name(settings.DEFAULT_QUEUE),
"MessageGroupId": str(uuid.uuid4()),
},
},
"remove_old_jobs": {
"task": "intel_owl.tasks.remove_old_jobs",
"schedule": crontab(minute=10, hour=2),
Expand Down
31 changes: 20 additions & 11 deletions intel_owl/settings/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,34 @@
print("Elasticsearch not correctly configured")

else:
ELASTICSEARCH_CLIENT = Elasticsearch(
ELASTICSEARCH_BI_CLIENT = Elasticsearch(
ELASTICSEARCH_BI_HOST,
maxsize=20,
max_retries=10,
retry_on_timeout=True,
timeout=30,
)
if not ELASTICSEARCH_CLIENT.ping():
print("ELASTICSEARCH client configuration did not connect correctly")
if not ELASTICSEARCH_BI_CLIENT.ping():
print(
f"ELASTICSEARCH BI client configuration did not connect correctly: {ELASTICSEARCH_BI_CLIENT.info()}"
)

ELASTICSEARCH_DSL_ENABLED = (
secrets.get_secret("ELASTICSEARCH_DSL_ENABLED", False) == "True"
)
if ELASTICSEARCH_DSL_ENABLED:
ELASTICSEARCH_DSL_HOST = secrets.get_secret("ELASTICSEARCH_DSL_HOST")
ELASTIC_HOST = secrets.get_secret("ELASTIC_HOST")
if ELASTIC_HOST:
elastic_client_settings = {"hosts": ELASTIC_HOST}

ELASTIC_PASSWORD = secrets.get_secret("ELASTIC_PASSWORD")
if ELASTIC_PASSWORD:
elastic_client_settings["basic_auth"] = ("elastic", ELASTIC_PASSWORD)
ca_path = "/opt/deploy/intel_owl/certs/elastic_ca/ca.crt"
cert_path = "/opt/deploy/intel_owl/certs/elastic_instance/elasticsearch.crt"
if "elasticsearch:9200" in ELASTIC_HOST:
# in case we use Elastic as container we need the generated
# in case we use Elastic as external service it should have a valid cert
elastic_client_settings["verify_certs"] = cert_path
elastic_client_settings["ca_certs"] = ca_path
ELASTICSEARCH_DSL = {"default": elastic_client_settings}

ELASTICSEARCH_DSL = {
"default": {"hosts": ELASTICSEARCH_DSL_HOST},
}
ELASTICSEARCH_DSL_INDEX_SETTINGS = {
"number_of_shards": int(secrets.get_secret("ELASTICSEARCH_DSL_NO_OF_SHARDS")),
"number_of_replicas": int(
Expand Down
62 changes: 61 additions & 1 deletion intel_owl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
import logging
import typing
import uuid
from typing import Dict, List

import inflection
from celery import Task, shared_task, signals
from celery.worker.consumer import Consumer
from celery.worker.control import control_command
from celery.worker.request import Request
from django.conf import settings
from django.utils.timezone import now
from django_celery_beat.models import PeriodicTask
from elasticsearch import ApiError
from elasticsearch.helpers import bulk
from elasticsearch_dsl import connections

from api_app.choices import Status
from api_app.choices import ReportStatus, Status
from intel_owl import secrets
from intel_owl.celery import app, get_queue_name

Expand Down Expand Up @@ -391,6 +396,61 @@ def send_bi_to_elastic(max_timeout: int = 60, max_objects: int = 10000):
)[:max_objects].send_to_elastic_as_bi(max_timeout=max_timeout)


@shared_task(
base=FailureLoggedTask, name="send_plugin_report_to_elastic", soft_time_limit=300
)
def send_plugin_report_to_elastic(max_timeout: int = 60, max_objects: int = 10000):

from api_app.analyzers_manager.models import AnalyzerReport
from api_app.connectors_manager.models import ConnectorReport
from api_app.ingestors_manager.models import IngestorReport
from api_app.models import AbstractReport
from api_app.pivots_manager.models import PivotReport
from api_app.visualizers_manager.models import VisualizerReport

def _convert_report_to_elastic_document(_class: AbstractReport) -> List[Dict]:
upper_threshold = now().replace(second=0, microsecond=0)
lower_threshold = upper_threshold - datetime.timedelta(minutes=5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timedeltas should not be calculated inside async functions but should be calculated beforehand. That is to avoid that, in case of congestion, this value changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's correct to calculate it inside the task: the alternative is to put it in the beat schedule, but this doesn't work because the function is called once when the schedule is defined and the time range would be the same for all the scheduled tasks. Am i wrong ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, you right. If I remember correctly, in another occasion we managed this case by calculating the time from an element of the database. In this way there's no way of getting this value wrong because this task would change it only at the time of execution. So, if there are any downtimes, there would be no loss of data. (I am afraid of having the sync misaligned cause we lose some data from time to time. That would make data analysis really bad)

(I would still get the time from "now" first and then, instead of doing minus 5 minutes, I would use as lower_threshold the data got from the database of the last update)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I remember what are you talking about, but I didn't find collections: I found some capped collections used to repeat a task in case of failure, it's similar, but not the same. However I found a way to do it with postgres, I proceed with the merge.

report_list: list(AbstractReport) = _class.objects.filter(
status__in=ReportStatus.final_statuses(),
end_time__lt=upper_threshold,
end_time__gte=lower_threshold,
)
return [
{
"_op_type": "index",
"_index": (
"plugin-report-"
f"{inflection.underscore(_class.__name__).replace('_', '-')}-"
f"{datetime.date.today()}"
),
"_source": {
"config": {"name": report.config.name},
"job": {"id": report.job.id},
"start_time": report.start_time,
"end_time": report.end_time,
"status": report.status,
"report": report.report,
},
}
for report in report_list
]

# add document
document_list = (
_convert_report_to_elastic_document(AnalyzerReport)
+ _convert_report_to_elastic_document(ConnectorReport)
+ _convert_report_to_elastic_document(IngestorReport)
+ _convert_report_to_elastic_document(PivotReport)
+ _convert_report_to_elastic_document(VisualizerReport)
)
logger.info(f"documents to add to elastic: {len(document_list)}")
try:
bulk(connections.get_connection(), document_list)
except ApiError as error:
logger.critical(error)


@shared_task(
base=FailureLoggedTask,
name="enable_configuration_for_org_for_rate_limit",
Expand Down
Loading