Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/country_workspace/admin/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
from django.shortcuts import render
from django.utils.translation import gettext as _

from country_workspace.models import MappingImporter, Program
from country_workspace.models import MappingImporter, Program, Transformer
from country_workspace.models.household import Household
from country_workspace.models.individual import Individual


class ReprocessForm(forms.Form):
transformer = forms.ModelChoiceField(
queryset=Transformer.objects.none(),
required=False,
label=_("Select Transformer (optional)"),
empty_label=_("No transformer"),
help_text=_("Transform values before applying mapping. Flow: transformer => mapping"),
)
mapping_importer = forms.ModelChoiceField(
queryset=MappingImporter.objects.none(),
required=True,
Expand All @@ -22,13 +29,15 @@ class ReprocessForm(forms.Form):
)

def __init__(self, *args: Any, **kwargs: Any) -> None:
queryset = kwargs.pop("queryset", MappingImporter.objects.none())
transformer_qs = kwargs.pop("transformer_queryset", Transformer.objects.none())
mapping_qs = kwargs.pop("mapping_queryset", MappingImporter.objects.none())
super().__init__(*args, **kwargs)
cast("forms.ModelChoiceField", self.fields["mapping_importer"]).queryset = queryset
cast("forms.ModelChoiceField", self.fields["transformer"]).queryset = transformer_qs
cast("forms.ModelChoiceField", self.fields["mapping_importer"]).queryset = mapping_qs


@admin.action(description=_("Reprocess records (apply mapping)"))
def reprocess_records(modeladmin: admin.ModelAdmin, request: HttpRequest, queryset: QuerySet) -> HttpResponse:
def reprocess_records(modeladmin: admin.ModelAdmin, request: HttpRequest, queryset: QuerySet) -> HttpResponse: # noqa: C901
model = queryset.model
checker_field = None

Expand All @@ -38,8 +47,17 @@ def reprocess_records(modeladmin: admin.ModelAdmin, request: HttpRequest, querys
checker_field = "individual_checker"

if "apply" in request.POST:
transformer_id = request.POST.get("transformer")
mapping_id = request.POST.get("mapping_importer")

transformer = None
if transformer_id:
try:
transformer = Transformer.objects.get(pk=transformer_id)
except Transformer.DoesNotExist:
modeladmin.message_user(request, _("Selected transformer not found."), messages.ERROR)
return HttpResponseRedirect(request.get_full_path())

if mapping_id:
try:
mapping = MappingImporter.objects.get(pk=mapping_id)
Expand All @@ -52,6 +70,8 @@ def reprocess_records(modeladmin: admin.ModelAdmin, request: HttpRequest, querys
if record.raw_data:
data = record.raw_data.copy()
mapping.apply(data)
if transformer:
transformer.apply(data)
record.flex_fields = data

record.last_checked = None
Expand All @@ -63,13 +83,17 @@ def reprocess_records(modeladmin: admin.ModelAdmin, request: HttpRequest, querys
modeladmin.message_user(request, _("Successfully reprocessed %s records.") % count, messages.SUCCESS)
return HttpResponseRedirect(request.get_full_path())

transformer_qs = Transformer.objects.none()
mapping_qs = MappingImporter.objects.none()
if checker_field:
program_ids = queryset.values_list("batch__program", flat=True).distinct()
checker_ids = Program.objects.filter(id__in=program_ids).values_list(checker_field, flat=True).distinct()
programs = Program.objects.filter(id__in=program_ids)
office_ids = programs.values_list("country_office_id", flat=True).distinct()
transformer_qs = Transformer.objects.filter(office_id__in=office_ids)
checker_ids = programs.values_list(checker_field, flat=True).distinct()
mapping_qs = MappingImporter.objects.filter(data_checker__id__in=checker_ids)

form = ReprocessForm(queryset=mapping_qs)
form = ReprocessForm(transformer_queryset=transformer_qs, mapping_queryset=mapping_qs)

context = {
**modeladmin.admin_site.each_context(request),
Expand Down
2 changes: 1 addition & 1 deletion src/country_workspace/admin/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def result(self, obj: Any) -> str:
else:
data = task_result.result

if isinstance(data, (dict, list)):
if isinstance(data, (dict | list)):
return format_html("<pre>{}</pre>", json.dumps(data, indent=2))
except (json.JSONDecodeError, TypeError):
pass
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import country_workspace.validators.mapping
import concurrency.fields
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("country_workspace", "0040_add_originating_id_to_validable"),
]

operations = [
migrations.CreateModel(
name="Transformer",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("last_modified", models.DateTimeField(auto_now=True)),
("version", concurrency.fields.IntegerVersionField(default=0, help_text="record revision number")),
("name", models.CharField(max_length=255)),
("description", models.CharField(blank=True, max_length=255)),
(
"value_transformations",
models.TextField(
blank=True,
default="",
help_text="Value transformation rules (one per line). Format: `fieldname:old_value=new_value`. Example: `sex:M=MALE` or `sex:F=FEMALE`.",
validators=[country_workspace.validators.mapping.ValueTransformationRulesValidator()],
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
(
"created_by",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="%(class)ss",
to=settings.AUTH_USER_MODEL,
),
),
(
"office",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="transformers",
to="country_workspace.office",
help_text="Business Area (Office) this transformer belongs to",
),
),
],
options={
"verbose_name": "Transformer",
"verbose_name_plural": "Transformers",
"unique_together": {("office", "name")},
},
),
]
2 changes: 2 additions & 0 deletions src/country_workspace/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .rdp import Rdp
from .role import UserRole
from .sync import SyncLog
from .transformer import Transformer
from .user import User

__all__ = [
Expand All @@ -30,6 +31,7 @@
"Rdi",
"Rdp",
"SyncLog",
"Transformer",
"User",
"UserRole",
]
3 changes: 1 addition & 2 deletions src/country_workspace/models/mapping_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ def rules_as_dict(self) -> dict[str, str]:
return dict(line.split("=", 1) for line in self.rules.splitlines())

def apply(self, data: dict[str, Any]) -> dict[str, Any]:
"""Apply mapping rules to transform field names."""
"""Apply field mapping to the data dictionary."""
if not self.rules:
return data

for external, internal in self.rules_as_dict.items():
if external in data:
data[internal] = data.pop(external)

return data
15 changes: 8 additions & 7 deletions src/country_workspace/models/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,20 @@ def apply_mapping_importer(
m: type[Validable] | Validable,
data: dict[str, str | int | bool],
mapping_id: int | None = None,
transformer_id: int | None = None,
) -> dict[str, str | int | bool]:
"""Apply mapping importer(s) either by explicit id or by the checker."""
from country_workspace.models import MappingImporter
"""Apply mapping importer(s) first, then transformer(s)."""
from country_workspace.models import MappingImporter, Transformer

if mapping_id is not None:
if importer := MappingImporter.objects.filter(id=mapping_id).first():
importer.apply(data)
return data
if (checker := self.get_checker_for(m)) is None:
return data
for importer in checker.mapping_importers.filter(office=self.country_office):
importer.apply(data)
elif checker := self.get_checker_for(m):
for importer in checker.mapping_importers.filter(office=self.country_office):
importer.apply(data)

if transformer_id is not None and (transformer := Transformer.objects.filter(id=transformer_id).first()):
transformer.apply(data)
return data

def get_default_fields_for(self, m: type[Validable] | Validable) -> dict[str, Any]:
Expand Down
84 changes: 84 additions & 0 deletions src/country_workspace/models/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Any
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _

from country_workspace.models.base import BaseModel
from country_workspace.validators.mapping import ValueTransformationRulesValidator


class Transformer(BaseModel):
name = models.CharField(max_length=255)
description = models.CharField(max_length=255, blank=True)
office = models.ForeignKey(
"Office",
on_delete=models.CASCADE,
related_name="transformers",
help_text=_("Business Area (Office) this transformer belongs to"),
)
value_transformations = models.TextField(
blank=True,
default="",
validators=[ValueTransformationRulesValidator()],
help_text=_("Value transformation rules (one per line). Format: %(format)s. Example: %(example)s.")
% {"format": "`fieldname:old_value=new_value`", "example": "`sex:M=MALE` or `sex:F=FEMALE`"},
)
created_at = models.DateTimeField(auto_now_add=True)
created_by = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True, related_name="%(class)ss"
)

class Meta:
verbose_name = _("Transformer")
verbose_name_plural = _("Transformers")
unique_together = [["office", "name"]]

def __str__(self) -> str:
return self.name

@property
def value_transformations_as_dict(self) -> dict[str, dict[str, str]]:
"""Parse value transformation rules into a nested dict: {fieldname: {old_value: new_value}}."""
if not self.value_transformations:
return {}

transformations: dict[str, dict[str, str]] = {}
for line_num, line in enumerate(self.value_transformations.splitlines(), start=1):
line = line.strip() # noqa: PLW2901
if not line:
continue

# Format: fieldname:old_value=new_value
if ":" not in line or "=" not in line:
raise ValueError(
f"Line {line_num}: Invalid format. Expected format: 'fieldname:old_value=new_value'. "
f"Line must contain both ':' and '=' characters. Got: {line!r}"
)

field_part, value_part = line.split(":", 1)
field_name = field_part.strip()
if "=" not in value_part:
raise ValueError(
f"Line {line_num}: Invalid format. Expected format: 'fieldname:old_value=new_value'. "
f"The value part after ':' must contain '='. Got: {line!r}"
)

old_value, new_value = (val.strip() for val in value_part.split("=", 1))
if field_name not in transformations:
transformations[field_name] = {}
transformations[field_name][old_value] = new_value

return transformations

def apply(self, data: dict[str, Any]) -> dict[str, Any]:
"""Apply value transformations to the data dictionary."""
if not self.value_transformations:
return data

transformations = self.value_transformations_as_dict
for field_name, value_map in transformations.items():
if field_name in data:
current_value = str(data[field_name])
if current_value in value_map:
data[field_name] = value_map[current_value]
return data
39 changes: 39 additions & 0 deletions src/country_workspace/validators/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,42 @@ def _validate_line(self, line: str, num: int) -> ValidationError | None:

def _error(self, num: int, message: str) -> ValidationError:
return ValidationError(_("Line %(num)d: %(message)s") % {"num": num, "message": message}, code="invalid_rule")


@deconstructible
class ValueTransformationRulesValidator:
"""Validate each non-empty line against: 'fieldname:old_value=new_value'."""

def __call__(self, value: str) -> None:
errors = []
for idx, line in enumerate((raw_line.strip() for raw_line in value.splitlines()), start=1):
if line:
error = self._validate_line(line, idx)
if error:
errors.append(error)
if errors:
raise ValidationError(errors)

def _validate_line(self, line: str, num: int) -> ValidationError | None:
if ":" not in line:
return self._error(num, _("Invalid format. Expected ':' character to separate fieldname from values."))

if line.count("=") != 1:
return self._error(num, _("Invalid format. Expected one '=' character."))

field_part, value_part = line.split(":", 1)
field_name = field_part.strip()
if not field_name:
return self._error(num, _("Invalid format. Field name cannot be empty."))

if "=" not in value_part:
return self._error(num, _("Invalid format. Expected format: 'fieldname:old_value=new_value'"))

old_value, new_value = (val.strip() for val in value_part.split("=", 1))
if old_value == new_value:
return self._error(num, _("Old value and new value must be different"))

return None

def _error(self, num: int, message: str) -> ValidationError:
return ValidationError(_("Line %(num)d: %(message)s") % {"num": num, "message": message}, code="invalid_rule")
Loading