Skip to content

Commit 82dc039

Browse files
feat: create ref_natinfs flow
1 parent 126b76b commit 82dc039

File tree

5 files changed

+99
-0
lines changed

5 files changed

+99
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CREATE TABLE public.ref_natinfs
2+
(
3+
id SERIAL PRIMARY KEY,
4+
nature VARCHAR,
5+
qualification TEXT,
6+
defined_by VARCHAR,
7+
repressed_by VARCHAR
8+
);

pipeline/src/flows/ref_natinfs.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from pathlib import Path
2+
import pandas as pd
3+
from prefect import flow, get_run_logger, task
4+
5+
from config import LIBRARY_LOCATION
6+
from src.generic_tasks import load
7+
8+
9+
10+
@task
11+
def get_csv_file(directory: str) -> Path:
12+
directory = Path(directory)
13+
if not directory.exists():
14+
raise FileNotFoundError(f"Le dossier {directory} n'existe pas")
15+
16+
csv_files = list(directory.glob("ref_natinfs.csv"))
17+
18+
if len(csv_files) == 0:
19+
raise FileNotFoundError("Le fichier ref_natinfs.csv est introuvable")
20+
21+
csv_file = csv_files[0]
22+
return csv_file
23+
24+
@task
25+
def extract_ref_natinfs(filePath):
26+
return pd.read_csv(
27+
filePath,
28+
keep_default_na=False,
29+
na_values=[""],
30+
sep=";",
31+
encoding="latin1"
32+
)
33+
34+
35+
@task
36+
def load_ref_natinfs(ref_natinfs: pd.DataFrame):
37+
ref_natinfs = ref_natinfs.rename(columns =
38+
{"Numéro NATINF": "id",
39+
"Nature de l'infraction": "nature",
40+
"Qualification de l'infraction": "qualification",
41+
"Définie par":"defined_by",
42+
"Réprimée par": "repressed_by"})
43+
44+
load(
45+
ref_natinfs,
46+
table_name="ref_natinfs",
47+
schema="public",
48+
db_name="monitorenv_remote",
49+
how="replace",
50+
logger=get_run_logger(),
51+
)
52+
53+
54+
@flow(name="MonitorEnv - Ref Natinfs")
55+
def ref_natinfs_flow():
56+
csv_filepath = get_csv_file(LIBRARY_LOCATION / "data/")
57+
ref_natinfs = extract_ref_natinfs(csv_filepath)
58+
load_ref_natinfs(ref_natinfs)

pipeline/tests/mocks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,7 @@ def mock_get_xsd_file(dummy: str):
9595
@task
9696
def mock_delete_files(xml_files: List[Path]) -> None:
9797
pass
98+
99+
@task
100+
def mock_get_csv_file(dummy: str):
101+
return Path(TEST_DATA_LOCATION / "ref_natinfs_csv" / "ref_natinfs.csv")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Numéro NATINF;Nature de l'infraction;Qualification de l'infraction;Définie par;Réprimée par
2+
7;Délit;RETRAIT DE LA PROVISION D'UN CHEQUE AVEC L'INTENTION DE PORTER ATTEINTE AUX DROITS D'AUTRUI;ART.L.163-2 AL.1 C.M.F.;ART.L.163-2 AL.1, ART.L.163-6 AL.1, AL.2 C.M.F.
3+
8;Délit;OPPOSITION AU PAIEMENT D'UN CHEQUE AVEC L'INTENTION DE PORTER ATTEINTE AUX DROITS D'AUTRUI;ART.L.163-2 AL.1 C.M.F.;ART.L.163-2 AL.1, ART.L.163-6 AL.1, AL.2 C.M.F. ART.131-30 AL.1 C.PENAL.
4+
11;Délit;ABANDON DE FAMILLE : NON PAIEMENT D'UNE PENSION OU D'UNE PRESTATION ALIMENTAIRE;ART.227-3 AL.1 C.PENAL.;ART.227-3 AL.1, ART.227-29 C.PENAL.
5+
13;Délit;INFRACTION A UNE INTERDICTION DE SEJOUR : FREQUENTATION D'UN LIEU INTERDIT;ART.434-38 AL.1, ART.131-31 AL.1 C.PENAL.;ART.434-38 AL.1, ART.434-44 AL.4 C.PENAL.
6+
16;Délit;AIDE A L'ENTREE, A LA CIRCULATION OU AU SEJOUR IRREGULIERS D'UN ETRANGER EN FRANCE;ART.L.823-1, ART.L.820-1 C.E.S.E.D.A.;ART.L.823-1 AL.1, ART.L.823-4, ART.L.823-6 C.E.S.E.D.A.
7+
19;Contravention de 5ème classe;NON PAIEMENT D'UNE PENSION ALIMENTAIRE DUE AU CREANCIER, PAR LE TIERS DEBITEUR TENU AU PAIEMENT DIRECT;ART.R.213-5, ART.L.213-1 C.P.C.EX.;ART.R.213-5 C.P.C.EX.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from unittest.mock import patch
2+
3+
import pandas as pd
4+
5+
6+
from src.flows.ref_natinfs import (
7+
ref_natinfs_flow,
8+
)
9+
from src.read_query import read_query
10+
from tests.mocks import (
11+
mock_get_csv_file,
12+
)
13+
14+
15+
@patch("src.flows.ref_natinfs.get_csv_file", mock_get_csv_file)
16+
def test_flow_ref_natinfs(create_cacem_tables, reset_test_data):
17+
state = ref_natinfs_flow(return_state=True)
18+
assert state.is_completed()
19+
20+
ref_natinfs = read_query(query = "SELECT * FROM ref_natinfs", db="monitorenv_remote")
21+
22+
assert len(ref_natinfs) == 6

0 commit comments

Comments
 (0)