Skip to content

Commit f5c2161

Browse files
authored
Merge pull request #1460 from MTES-MCT/feat-insee-data
Ingest du dossier complet insee
2 parents 2e1afb9 + a7ef3bb commit f5c2161

File tree

172 files changed

+10397
-132
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

172 files changed

+10397
-132
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
3+
import pandas as pd
4+
from include.container import DomainContainer as Container
5+
from include.container import InfraContainer
6+
from pendulum import datetime
7+
8+
from airflow.decorators import dag, task
9+
10+
URL = "https://www.insee.fr/fr/statistiques/fichier/5359146/dossier_complet.zip"
11+
12+
13+
@dag(
14+
start_date=datetime(2024, 1, 1),
15+
schedule="@once",
16+
catchup=False,
17+
doc_md=__doc__,
18+
max_active_runs=1,
19+
default_args={"owner": "Alexis Athlani", "retries": 3},
20+
tags=["INSEE"],
21+
)
22+
def ingest_dossier_complet():
23+
bucket_name = InfraContainer().bucket_name()
24+
s3_key = "insee/dossier_complet.csv"
25+
26+
@task.python
27+
def download() -> str:
28+
return (
29+
Container()
30+
.remote_zip_to_s3_file_handler()
31+
.download_zip_extract_and_upload_to_s3(
32+
url=URL,
33+
s3_key=s3_key,
34+
s3_bucket=bucket_name,
35+
target_extension=".csv",
36+
)
37+
)
38+
39+
@task.python
40+
def ingest() -> int | None:
41+
"""Unpivot le CSV (1900+ colonnes) en format EAV (codgeo, variable, value)
42+
pour contourner la limite de 1600 colonnes PostgreSQL."""
43+
s3_path = f"{bucket_name}/{s3_key}"
44+
tmp_localpath = "/tmp/dossier_complet.csv"
45+
46+
InfraContainer().s3().get_file(s3_path, tmp_localpath)
47+
48+
engine = InfraContainer().sqlalchemy_dbt_conn()
49+
table_name = "insee_dossier_complet"
50+
chunk_size = 5000
51+
total_rows = 0
52+
53+
for i, chunk in enumerate(pd.read_csv(tmp_localpath, sep=";", dtype=str, chunksize=chunk_size)):
54+
melted = chunk.melt(
55+
id_vars=["CODGEO"],
56+
var_name="variable",
57+
value_name="value",
58+
)
59+
melted = melted.dropna(subset=["value"])
60+
row_count = melted.to_sql(
61+
name=table_name,
62+
con=engine,
63+
if_exists="replace" if i == 0 else "append",
64+
index=False,
65+
)
66+
total_rows += row_count if row_count else len(melted)
67+
68+
os.remove(tmp_localpath)
69+
return total_rows
70+
71+
download() >> ingest()
72+
73+
74+
ingest_dossier_complet()

airflow/dags/update_app.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,28 @@ def copy_table_from_dw_to_app(
154154
"copy_public_data_landfriche",
155155
"copy_public_data_landfrichegeojson",
156156
"copy_public_data_landcarroyagebounds",
157+
"copy_public_data_dc_population",
158+
"copy_public_data_dc_menages",
159+
"copy_public_data_dc_logement",
160+
"copy_public_data_dc_mobilite_residentielle",
161+
"copy_public_data_dc_categories_socioprofessionnelles",
162+
"copy_public_data_dc_situation_conjugale",
163+
"copy_public_data_dc_scolarisation_diplomes",
164+
"copy_public_data_dc_emploi_statut",
165+
"copy_public_data_dc_deplacements_domicile_travail",
166+
"copy_public_data_dc_activite_chomage",
167+
"copy_public_data_dc_emplois_lieu_travail",
168+
"copy_public_data_dc_logement_confort",
169+
"copy_public_data_dc_historique",
170+
"copy_public_data_dc_revenus_pauvrete",
171+
"copy_public_data_dc_salaires",
172+
"copy_public_data_dc_etablissements",
173+
"copy_public_data_dc_creations_entreprises",
174+
"copy_public_data_dc_creations_etablissements",
175+
"copy_public_data_dc_unites_legales_actives",
176+
"copy_public_data_dc_tourisme",
177+
"copy_public_data_dc_electeurs",
178+
"copy_public_data_dc_equipements_bpe",
157179
],
158180
type="array",
159181
),
@@ -721,6 +743,203 @@ def copy_public_data_landcarroyagebounds(**context):
721743
["land_id", "land_type", "start_year", "end_year", "destination"],
722744
],
723745
)
746+
@task.python
747+
def copy_public_data_dc_population(**context):
748+
return copy_table_from_dw_to_app(
749+
from_table="public_for_app.for_app_dc_population",
750+
to_table="public.public_data_dc_population",
751+
environment=context["params"]["environment"],
752+
btree_index_columns=[["land_id", "land_type"]],
753+
)
754+
755+
@task.python
756+
def copy_public_data_dc_menages(**context):
757+
return copy_table_from_dw_to_app(
758+
from_table="public_for_app.for_app_dc_menages",
759+
to_table="public.public_data_dc_menages",
760+
environment=context["params"]["environment"],
761+
btree_index_columns=[["land_id", "land_type"]],
762+
)
763+
764+
@task.python
765+
def copy_public_data_dc_logement(**context):
766+
return copy_table_from_dw_to_app(
767+
from_table="public_for_app.for_app_dc_logement",
768+
to_table="public.public_data_dc_logement",
769+
environment=context["params"]["environment"],
770+
btree_index_columns=[["land_id", "land_type"]],
771+
)
772+
773+
@task.python
774+
def copy_public_data_dc_mobilite_residentielle(**context):
775+
return copy_table_from_dw_to_app(
776+
from_table="public_for_app.for_app_dc_mobilite_residentielle",
777+
to_table="public.public_data_dc_mobilite_residentielle",
778+
environment=context["params"]["environment"],
779+
btree_index_columns=[["land_id", "land_type"]],
780+
)
781+
782+
@task.python
783+
def copy_public_data_dc_categories_socioprofessionnelles(**context):
784+
return copy_table_from_dw_to_app(
785+
from_table="public_for_app.for_app_dc_categories_socioprofessionnelles",
786+
to_table="public.public_data_dc_categories_socioprofessionnelles",
787+
environment=context["params"]["environment"],
788+
btree_index_columns=[["land_id", "land_type"]],
789+
)
790+
791+
@task.python
792+
def copy_public_data_dc_situation_conjugale(**context):
793+
return copy_table_from_dw_to_app(
794+
from_table="public_for_app.for_app_dc_situation_conjugale",
795+
to_table="public.public_data_dc_situation_conjugale",
796+
environment=context["params"]["environment"],
797+
btree_index_columns=[["land_id", "land_type"]],
798+
)
799+
800+
@task.python
801+
def copy_public_data_dc_scolarisation_diplomes(**context):
802+
return copy_table_from_dw_to_app(
803+
from_table="public_for_app.for_app_dc_scolarisation_diplomes",
804+
to_table="public.public_data_dc_scolarisation_diplomes",
805+
environment=context["params"]["environment"],
806+
btree_index_columns=[["land_id", "land_type"]],
807+
)
808+
809+
@task.python
810+
def copy_public_data_dc_emploi_statut(**context):
811+
return copy_table_from_dw_to_app(
812+
from_table="public_for_app.for_app_dc_emploi_statut",
813+
to_table="public.public_data_dc_emploi_statut",
814+
environment=context["params"]["environment"],
815+
btree_index_columns=[["land_id", "land_type"]],
816+
)
817+
818+
@task.python
819+
def copy_public_data_dc_deplacements_domicile_travail(**context):
820+
return copy_table_from_dw_to_app(
821+
from_table="public_for_app.for_app_dc_deplacements_domicile_travail",
822+
to_table="public.public_data_dc_deplacements_domicile_travail",
823+
environment=context["params"]["environment"],
824+
btree_index_columns=[["land_id", "land_type"]],
825+
)
826+
827+
@task.python
828+
def copy_public_data_dc_activite_chomage(**context):
829+
return copy_table_from_dw_to_app(
830+
from_table="public_for_app.for_app_dc_activite_chomage",
831+
to_table="public.public_data_dc_activite_chomage",
832+
environment=context["params"]["environment"],
833+
btree_index_columns=[["land_id", "land_type"]],
834+
)
835+
836+
@task.python
837+
def copy_public_data_dc_emplois_lieu_travail(**context):
838+
return copy_table_from_dw_to_app(
839+
from_table="public_for_app.for_app_dc_emplois_lieu_travail",
840+
to_table="public.public_data_dc_emplois_lieu_travail",
841+
environment=context["params"]["environment"],
842+
btree_index_columns=[["land_id", "land_type"]],
843+
)
844+
845+
@task.python
846+
def copy_public_data_dc_logement_confort(**context):
847+
return copy_table_from_dw_to_app(
848+
from_table="public_for_app.for_app_dc_logement_confort",
849+
to_table="public.public_data_dc_logement_confort",
850+
environment=context["params"]["environment"],
851+
btree_index_columns=[["land_id", "land_type"]],
852+
)
853+
854+
@task.python
855+
def copy_public_data_dc_historique(**context):
856+
return copy_table_from_dw_to_app(
857+
from_table="public_for_app.for_app_dc_historique",
858+
to_table="public.public_data_dc_historique",
859+
environment=context["params"]["environment"],
860+
btree_index_columns=[["land_id", "land_type"]],
861+
)
862+
863+
@task.python
864+
def copy_public_data_dc_revenus_pauvrete(**context):
865+
return copy_table_from_dw_to_app(
866+
from_table="public_for_app.for_app_dc_revenus_pauvrete",
867+
to_table="public.public_data_dc_revenus_pauvrete",
868+
environment=context["params"]["environment"],
869+
btree_index_columns=[["land_id", "land_type"]],
870+
)
871+
872+
@task.python
873+
def copy_public_data_dc_salaires(**context):
874+
return copy_table_from_dw_to_app(
875+
from_table="public_for_app.for_app_dc_salaires",
876+
to_table="public.public_data_dc_salaires",
877+
environment=context["params"]["environment"],
878+
btree_index_columns=[["land_id", "land_type"]],
879+
)
880+
881+
@task.python
882+
def copy_public_data_dc_etablissements(**context):
883+
return copy_table_from_dw_to_app(
884+
from_table="public_for_app.for_app_dc_etablissements",
885+
to_table="public.public_data_dc_etablissements",
886+
environment=context["params"]["environment"],
887+
btree_index_columns=[["land_id", "land_type"]],
888+
)
889+
890+
@task.python
891+
def copy_public_data_dc_creations_entreprises(**context):
892+
return copy_table_from_dw_to_app(
893+
from_table="public_for_app.for_app_dc_creations_entreprises",
894+
to_table="public.public_data_dc_creations_entreprises",
895+
environment=context["params"]["environment"],
896+
btree_index_columns=[["land_id", "land_type"]],
897+
)
898+
899+
@task.python
900+
def copy_public_data_dc_creations_etablissements(**context):
901+
return copy_table_from_dw_to_app(
902+
from_table="public_for_app.for_app_dc_creations_etablissements",
903+
to_table="public.public_data_dc_creations_etablissements",
904+
environment=context["params"]["environment"],
905+
btree_index_columns=[["land_id", "land_type"]],
906+
)
907+
908+
@task.python
909+
def copy_public_data_dc_unites_legales_actives(**context):
910+
return copy_table_from_dw_to_app(
911+
from_table="public_for_app.for_app_dc_unites_legales_actives",
912+
to_table="public.public_data_dc_unites_legales_actives",
913+
environment=context["params"]["environment"],
914+
btree_index_columns=[["land_id", "land_type"]],
915+
)
916+
917+
@task.python
918+
def copy_public_data_dc_tourisme(**context):
919+
return copy_table_from_dw_to_app(
920+
from_table="public_for_app.for_app_dc_tourisme",
921+
to_table="public.public_data_dc_tourisme",
922+
environment=context["params"]["environment"],
923+
btree_index_columns=[["land_id", "land_type"]],
924+
)
925+
926+
@task.python
927+
def copy_public_data_dc_electeurs(**context):
928+
return copy_table_from_dw_to_app(
929+
from_table="public_for_app.for_app_dc_electeurs",
930+
to_table="public.public_data_dc_electeurs",
931+
environment=context["params"]["environment"],
932+
btree_index_columns=[["land_id", "land_type"]],
933+
)
934+
935+
@task.python
936+
def copy_public_data_dc_equipements_bpe(**context):
937+
return copy_table_from_dw_to_app(
938+
from_table="public_for_app.for_app_dc_equipements_bpe",
939+
to_table="public.public_data_dc_equipements_bpe",
940+
environment=context["params"]["environment"],
941+
btree_index_columns=[["land_id", "land_type"]],
942+
)
724943

725944
@task.branch
726945
def copy_public_data_branch(**context):
@@ -775,6 +994,28 @@ def copy_public_data_branch(**context):
775994
copy_public_data_landfriche(),
776995
copy_public_data_landfrichegeojson(),
777996
copy_public_data_landcarroyagebounds(),
997+
copy_public_data_dc_population(),
998+
copy_public_data_dc_menages(),
999+
copy_public_data_dc_logement(),
1000+
copy_public_data_dc_mobilite_residentielle(),
1001+
copy_public_data_dc_categories_socioprofessionnelles(),
1002+
copy_public_data_dc_situation_conjugale(),
1003+
copy_public_data_dc_scolarisation_diplomes(),
1004+
copy_public_data_dc_emploi_statut(),
1005+
copy_public_data_dc_deplacements_domicile_travail(),
1006+
copy_public_data_dc_activite_chomage(),
1007+
copy_public_data_dc_emplois_lieu_travail(),
1008+
copy_public_data_dc_logement_confort(),
1009+
copy_public_data_dc_historique(),
1010+
copy_public_data_dc_revenus_pauvrete(),
1011+
copy_public_data_dc_salaires(),
1012+
copy_public_data_dc_etablissements(),
1013+
copy_public_data_dc_creations_entreprises(),
1014+
copy_public_data_dc_creations_etablissements(),
1015+
copy_public_data_dc_unites_legales_actives(),
1016+
copy_public_data_dc_tourisme(),
1017+
copy_public_data_dc_electeurs(),
1018+
copy_public_data_dc_equipements_bpe(),
7781019
]
7791020

7801021

0 commit comments

Comments
 (0)