Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions api_app/ingestors_manager/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
def pre_save_ingestor_config(sender, instance: IngestorConfig, *args, **kwargs):
from intel_owl.tasks import execute_ingestor

instance.user = User.objects.get_or_create(
username=f"{instance.name.title()}Ingestor"
)[0]
user = User.objects.get_or_create(username=f"{instance.name.title()}Ingestor")[0]
user.profile.task_priority = 7
user.profile.is_robot = True
user.profile.save()
instance.user = user

periodic_task = PeriodicTask.objects.update_or_create(
name=f"{instance.name.title()}Ingestor",
Expand Down
7 changes: 7 additions & 0 deletions api_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def retry(self):
runner.apply_async(
queue=get_queue_name(settings.CONFIG_QUEUE),
MessageGroupId=str(uuid.uuid4()),
priority=self.priority,
)

def set_final_status(self) -> None:
Expand Down Expand Up @@ -540,8 +541,13 @@ def _final_status_signature(self) -> Signature:
queue=get_queue_name(settings.CONFIG_QUEUE),
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=self.priority,
)

@property
def priority(self):
return self.user.profile.task_priority

def _get_pipeline(
self,
analyzers: PythonConfigQuerySet,
Expand Down Expand Up @@ -1199,6 +1205,7 @@ def _signature_pipeline_status(cls, job, status: str) -> Signature:
queue=get_queue_name(settings.CONFIG_QUEUE),
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=job.priority,
)

@property
Expand Down
1 change: 1 addition & 0 deletions api_app/queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def get_signatures(self, job) -> Generator[Signature, None, None]:
task_id=task_id,
immutable=True,
MessageGroupId=str(task_id),
priority=job.priority,
)


Expand Down
1 change: 1 addition & 0 deletions api_app/serializers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def create(self, validated_data: Dict) -> Job:
args=[job.pk],
queue=get_queue_name(settings.DEFAULT_QUEUE),
MessageGroupId=str(uuid.uuid4()),
priority=job.priority,
)

return job
Expand Down
1 change: 1 addition & 0 deletions api_app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def perform_retry(report: AbstractReport):
queue=report.config.queue,
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=report.job.priority,
)
runner()

Expand Down
3 changes: 3 additions & 0 deletions authentication/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ class UserProfileAdmin(admin.ModelAdmin):
"company_name",
"company_role",
"discover_from",
"task_priority",
"is_robot",
)
list_filter = ["task_priority", "is_robot"]

@admin.display(boolean=True)
def user_is_active(self, obj: UserProfile) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions authentication/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@

class ApiAppAuthConfig(AppConfig):
name = "authentication"

@staticmethod
def ready() -> None:
from . import signals # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Generated by Django 4.2.11 on 2024-04-04 08:16

import django.core.validators
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("authentication", "0002_migrate_from_durin"),
]

operations = [
migrations.AddField(
model_name="userprofile",
name="is_robot",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="userprofile",
name="task_priority",
field=models.IntegerField(
default=10,
validators=[
django.core.validators.MaxValueValidator(10),
django.core.validators.MinValueValidator(1),
],
),
),
migrations.AlterField(
model_name="userprofile",
name="user",
field=models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="profile",
to=settings.AUTH_USER_MODEL,
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 4.2.11 on 2024-04-04 08:27

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
(
"authentication",
"0003_userprofile_is_robot_userprofile_task_priority_and_more",
),
]

operations = [
migrations.AlterField(
model_name="userprofile",
name="company_name",
field=models.CharField(
max_length=32,
null=True,
validators=[django.core.validators.MinLengthValidator(3)],
),
),
migrations.AlterField(
model_name="userprofile",
name="company_role",
field=models.CharField(
max_length=32,
null=True,
validators=[django.core.validators.MinLengthValidator(3)],
),
),
]
28 changes: 28 additions & 0 deletions authentication/migrations/0005_create_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.11 on 2024-04-04 07:46

from django.conf import settings
from django.db import migrations


def migrate(apps, schema_editor):
User = apps.get_model(*settings.AUTH_USER_MODEL.split("."))
Profile = apps.get_model("authentication", "UserProfile")
for user in User.objects.all():
is_robot = user.username.endswith("Ingestor")
Profile.objects.create(
user=user, task_priority=7 if is_robot else 10, is_robot=is_robot
)


def reverse_migrate(apps, schema_editor):
Profile = apps.get_model("authentication", "UserProfile")
Profile.objects.all().delete()


class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("authentication", "0004_alter_userprofile_company_name_and_more"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
22 changes: 17 additions & 5 deletions authentication/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# See the file 'LICENSE' for copying permission.

from django.conf import settings
from django.core.validators import MinLengthValidator
from django.core.validators import (
MaxValueValidator,
MinLengthValidator,
MinValueValidator,
)
from django.db import models

__all__ = [
Expand All @@ -24,17 +28,21 @@ class DiscoverFromChoices(models.TextChoices):


class UserProfile(models.Model):
# contants
# constants
DiscoverFromChoices = DiscoverFromChoices

# fields
user = models.OneToOneField(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="user_profile",
related_name="profile",
)
company_name = models.CharField(
max_length=32, validators=[MinLengthValidator(3)], null=True
)
company_role = models.CharField(
max_length=32, validators=[MinLengthValidator(3)], null=True
)
company_name = models.CharField(max_length=32, validators=[MinLengthValidator(3)])
company_role = models.CharField(max_length=32, validators=[MinLengthValidator(3)])
twitter_handle = models.CharField(
max_length=16, default="", blank=True, validators=[MinLengthValidator(3)]
)
Expand All @@ -43,6 +51,10 @@ class UserProfile(models.Model):
choices=DiscoverFromChoices.choices,
default=DiscoverFromChoices.OTHER,
)
task_priority = models.IntegerField(
default=10, validators=[MaxValueValidator(10), MinValueValidator(1)]
)
is_robot = models.BooleanField(default=False)

# meta
class Meta:
Expand Down
6 changes: 4 additions & 2 deletions authentication/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ def create(self, validated_data):
try:
user = super().create(validated_data)

# save profile object only if user object was actually saved
# update profile object only if user object was actually saved
if getattr(user, "pk", None):
self._profile_serializer.save(user=user)
self._profile_serializer.update(
user.profile, self._profile_serializer.data
)
user.refresh_from_db()
except DatabaseError:
transaction.rollback()
Expand Down
11 changes: 11 additions & 0 deletions authentication/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from django.conf import settings
from django.db.models.signals import post_save
from django.dispatch import receiver

from authentication.models import UserProfile


@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def post_save_user(sender, instance, created, **kwargs):
if created:
UserProfile.objects.create(user=instance)
2 changes: 1 addition & 1 deletion docker/entrypoints/uwsgi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ sleep 3
# The customization of the parameters is not applied until the migration is done
python manage.py makemigrations durin
python manage.py makemigrations rest_email_auth
python manage.py createcachetable
# fake-initial does not fake the migration if the table does not exist
python manage.py migrate --fake-initial
if ! python manage.py migrate --check
then
echo "Issue with migration exiting"
exit 1
fi
python manage.py createcachetable
# Collect static files
python manage.py collectstatic --noinput
echo "------------------------------"
Expand Down
2 changes: 2 additions & 0 deletions docs/source/Advanced-Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ Only FIFO queues are supported.

If you want to use a remote message broker (like an `ElasticCache` or `AmazonMQ` instance), you must populate the `BROKER_URL` environment variable.

It is possible to use [task priority](https://docs.celeryq.dev/en/stable/userguide/routing.html#special-routing-options) inside IntelOwl: each User has default priority of 10, and robots users (like the Ingestors) have a priority of 7.
You can customize these priorities inside Django Admin, in the `Authentication.User Profiles` section.

#### Websockets

Expand Down
11 changes: 6 additions & 5 deletions intel_owl/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ def get_queue_name(queue: str) -> str:
BROKER_TRANSPORT_OPTIONS["access_key_id"] = settings.AWS_ACCESS_KEY_ID
BROKER_TRANSPORT_OPTIONS["secret_access_key"] = settings.AWS_SECRET_ACCESS_KEY
else:
BROKER_TRANSPORT_OPTIONS = {}
BROKER_TRANSPORT_OPTIONS = {
"priority_steps": list(range(10)),
"sep": ":",
"queue_order_strategy": "priority",
}

task_queues = [
Queue(
get_queue_name(key),
routing_key=key,
)
Queue(get_queue_name(key), routing_key=key, queue_arguments={"x-max-priority": 10})
for key in settings.CELERY_QUEUES
]
if not settings.AWS_SQS:
Expand Down
15 changes: 9 additions & 6 deletions intel_owl/settings/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Dict

from django.core.cache.backends.db import DatabaseCache
from django.db import connections, router
from django.db import ProgrammingError, connections, router


def plain_key(key, key_prefix, version):
Expand All @@ -24,11 +24,14 @@ def get_where(self, starts_with: str, version=None) -> Dict[str, Any]:
table = connections[db].ops.quote_name(self._table)
query = self.make_and_validate_key(starts_with + "%", version=version)
with connections[db].cursor() as cursor:
cursor.execute(
f"SELECT cache_key, value, expires FROM {table} "
"WHERE cache_key LIKE %s",
[query],
)
try:
cursor.execute(
f"SELECT cache_key, value, expires FROM {table} "
"WHERE cache_key LIKE %s",
[query],
)
except ProgrammingError:
return {}
rows = cursor.fetchall()
if len(rows) < 1:
return {}
Expand Down
2 changes: 2 additions & 0 deletions tests/auth/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def test_register_201(self):
self.assertFalse(
user.is_active, msg="newly registered user must have is_active=False"
)
self.assertEqual(user.profile.company_name, "companytest")
user.delete()

def test_verify_email_200(self):
# register new user
Expand Down