Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/country_workspace/contrib/aurora/admin/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@

from country_workspace.admin.base import BaseModelAdmin
from country_workspace.contrib.aurora.models import Project
from country_workspace.admin.sync import SyncAdminMixin, SyncAdminConfig, TargetConfig, Target


@admin.register(Project)
class ProjectAdmin(BaseModelAdmin):
class ProjectAdmin(SyncAdminMixin, BaseModelAdmin):
list_display = ("name", "program", "last_synced")
search_fields = ("name",)
ordering = ("name",)
autocomplete_fields = ("program",)

sync_config = SyncAdminConfig(
targets=[
TargetConfig(target=Target.PROJECTS),
TargetConfig(target=Target.REGISTRATIONS),
]
)

@admin.display(ordering="last_modified")
def last_synced(self, obj: Project) -> str:
return obj.last_modified
Expand Down
9 changes: 1 addition & 8 deletions src/country_workspace/contrib/aurora/admin/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

from country_workspace.admin.base import BaseModelAdmin
from country_workspace.contrib.aurora.models import Registration
from country_workspace.admin.sync import SyncAdminMixin, SyncAdminConfig, TargetConfig, Target


@admin.register(Registration)
class RegistrationAdmin(SyncAdminMixin, BaseModelAdmin):
class RegistrationAdmin(BaseModelAdmin):
list_display = ("name", "project", "active", "last_synced")
list_filter = (
("project", AutoCompleteFilter),
Expand All @@ -17,12 +16,6 @@ class RegistrationAdmin(SyncAdminMixin, BaseModelAdmin):
search_fields = ("name",)
ordering = ("name",)
autocomplete_fields = ("project",)
sync_config = SyncAdminConfig(
targets=[
TargetConfig(target=Target.PROJECTS),
TargetConfig(target=Target.REGISTRATIONS),
]
)

@admin.display(ordering="last_modified")
def last_synced(self, obj: Registration) -> str:
Expand Down
9 changes: 0 additions & 9 deletions src/country_workspace/contrib/aurora/exceptions.py

This file was deleted.

26 changes: 5 additions & 21 deletions src/country_workspace/contrib/aurora/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,16 @@


class ImportAuroraForm(BaseImportForm):
batch_name = forms.CharField(required=False, help_text="Label for this batch.")
registration = forms.ModelChoiceField(
queryset=Registration.objects.none(),
help_text="What type of registrations are being imported.",
)
household_column_prefix = forms.CharField(
initial="household_", help_text="Household's column group prefix", required=False
)
individuals_column_prefix = forms.CharField(
initial="individuals_",
help_text="Individuals' column group prefix",
)
household_label_column = forms.CharField(
required=False,
initial="family_name",
help_text="Which Individual's column should be used as label for the household.",
)

def __init__(self, *args: Any, program: Program | None = None, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.program = program
if program:
self.fields["registration"].queryset = Registration.objects.filter(project__program=program, active=True)
if not (program.beneficiary_group and program.beneficiary_group.master_detail):
self.fields = {
key: value
for key, value in self.fields.items()
if key not in ("household_column_prefix", "household_label_column")
}
self.fields["registration"].queryset = (
Registration.objects.select_related("project", "project__program")
.filter(project__program=program, active=True)
.order_by("name")
)
132 changes: 132 additions & 0 deletions src/country_workspace/contrib/aurora/import_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing import Any, NamedTuple
from itertools import chain
from collections.abc import Mapping
from functools import partial

from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from django.utils import timezone

from country_workspace.contrib.aurora.client import AuroraClient
from country_workspace.models import AsyncJob, Batch, Individual, SyncLog, Program
from country_workspace.utils.config import BatchNameConfig, ValidateModeConfig
from country_workspace.utils.fields import clean_field_names
from country_workspace.utils.sync_log import get_aurora_sync_log_name
from country_workspace.utils.functional import compose


class Config(BatchNameConfig, ValidateModeConfig):
registration_reference_pk: str | None
master_detail: bool


class ImportResult(NamedTuple):
people: int


def import_data(job: AsyncJob) -> ImportResult:
config: Config = job.config
if config.get("master_detail"):
raise NotImplementedError
if not config.get("registration_reference_pk"):
raise ImportError("registration_reference_pk is required for Aurora import")

batch = Batch.objects.create(
name=config["batch_name"],
program=job.program,
country_office=job.program.country_office,
imported_by=job.owner,
source=Batch.BatchSource.AURORA,
)

total_people = 0
client = AuroraClient()
for result in client.get(f"registration/{config['registration_reference_pk']}/records/"):
imported = import_result(batch, result, config)
total_people += imported.people
return ImportResult(people=total_people)


def import_result(batch: Batch, result: Mapping[str, Any], config: Config) -> ImportResult:
people_counter = 0
sync_log_name = get_aurora_sync_log_name(f"registration{config['registration_reference_pk']}")
program_ct = ContentType.objects.get_for_model(Program)
sync_log = SyncLog.objects.filter(name=sync_log_name, content_type=program_ct, object_id=batch.program.id).first()
last_id = int(sync_log.last_id) if sync_log and sync_log.last_id else 0
last_successful_id = last_id

try:
current_id = int(result["pk"])
if current_id <= last_id:
return ImportResult(people=0)
with transaction.atomic():
create_people(batch, result, config)
people_counter += 1
last_successful_id = current_id
except Exception as e:
failed_id = result.get("pk", "unknown (before first record)")
error_msg = (
f"Successfully imported {people_counter} people, before stopping at record {failed_id} due to:\n"
f"Error: {e}\n"
f"Last successful record ID: {last_successful_id}."
)
raise ImportError(error_msg) from e
finally:
if last_successful_id > last_id:
SyncLog.objects.update_or_create(
name=sync_log_name,
content_type=program_ct,
object_id=batch.program.id,
defaults={"last_id": str(last_successful_id), "last_update_date": timezone.now()},
)
return ImportResult(people=people_counter)


def create_people(batch: Batch, record: dict[str, Any], config: Config) -> Individual:
transform_individual_row = compose(
flatten_top2_prefixed,
clean_field_names,
partial(batch.program.apply_mapping_importer, Individual),
make_full_name,
)
return Individual.objects.create(
batch_id=batch.pk,
name="",
household=None,
flex_fields=transform_individual_row(record["fields"]),
raw_data=record,
)


def flatten_top2_prefixed(
data: Mapping[str, Any],
sep: str = "_",
) -> dict[str, Any]:
"""Flatten top level; prefix-expand second-level dicts by parent key; ignore deeper nesting."""
out: dict[str, Any] = {}

def ld2d(items: list[Mapping[str, Any]]) -> dict[str, Any]:
return dict(chain.from_iterable(d.items() for d in items))

def merge(d: Mapping[str, Any]) -> None:
for k, v in d.items():
if isinstance(v, Mapping): # 2nd level dict
out.update({f"{k}{sep}{kk}": vv for kk, vv in v.items()})
elif isinstance(v, list) and all(isinstance(it, Mapping) for it in v): # list[dict]
merge(ld2d(v))
else:
out[k] = v

merge(data)
return out


def make_full_name(row: dict[str, Any]) -> dict[str, Any]: # pragma: no cover
if (row.get("full_name") or "").strip():
return row

parts = [(row.get(k) or "").strip() for k in ("given_name", "middle_name", "family_name")]
if full := " ".join(p for p in parts if p):
row["full_name"] = full

return row
Loading
Loading