1+ import json
2+
13import pendulum
4+ from include .domain .container import Container
5+ from include .utils import multiline_string_to_single_line
6+
27from airflow .decorators import dag , task
38from airflow .exceptions import AirflowSkipException
49from airflow .models .param import Param
5- from include .domain .container import Container
10+
11+ with open ("include/domain/data/ocsge/sources.json" , "r" ) as f :
12+ sources = json .load (f )
613
714
8- def get_geojson_filename (year : int , departement : str ) -> str :
9- return f"occupation_du_sol_{ year } _{ departement } .geojson"
15+ def get_geojson_filename (index : int , departement : str ) -> str :
16+ return f"occupation_du_sol_{ index } _{ departement } .geojson"
1017
1118
12- def get_pmtiles_filename (year : int , departement : str ) -> str :
13- return f"occupation_du_sol_{ year } _{ departement } .pmtiles"
19+ def get_pmtiles_filename (index : int , departement : str ) -> str :
20+ return f"occupation_du_sol_{ index } _{ departement } .pmtiles"
1421
1522
1623@dag (
@@ -23,8 +30,8 @@ def get_pmtiles_filename(year: int, departement: str) -> str:
2330 tags = ["OCS GE" ],
2431 max_active_tasks = 10 ,
2532 params = {
26- "year " : Param (2018 , type = "integer" ),
27- "departement" : Param ("75" , type = "string" ),
33+ "index " : Param (1 , type = "integer" , enum = [ 1 , 2 ] ),
34+ "departement" : Param ("75" , type = "string" , enum = list ( sources . keys ()) ),
2835 "refresh_existing" : Param (False , type = "boolean" ),
2936 },
3037)
@@ -36,35 +43,55 @@ def create_ocsge_vector_tiles():
3643 def check_if_vector_tiles_not_exist (params : dict ):
3744 if params .get ("refresh_existing" ):
3845 return
39- year = params .get ("year " )
46+ index = params .get ("index " )
4047 departement = params .get ("departement" )
41- filename = get_pmtiles_filename (year , departement )
48+ filename = get_pmtiles_filename (index , departement )
4249 exists = Container ().s3 ().exists (f"{ bucket_name } /{ vector_tiles_dir } /{ filename } " )
4350 if exists :
4451 raise AirflowSkipException ("Vector tiles already exist" )
4552
4653 @task .python (trigger_rule = "none_skipped" )
4754 def postgis_to_geojson (params : dict ):
48- year = params .get ("year " )
55+ index = params .get ("index " )
4956 departement = params .get ("departement" )
50- filename = get_geojson_filename (year , departement )
57+ filename = get_geojson_filename (index , departement )
58+
59+ sql = f"""
60+ SELECT
61+ id,
62+ code_cs,
63+ code_us,
64+ departement,
65+ year,
66+ index,
67+ is_impermeable,
68+ is_artificial,
69+ critere_seuil,
70+ st_transform(geom, 4326) as geom,
71+ surface
72+ FROM
73+ public_ocsge.occupation_du_sol_with_artif
74+ WHERE
75+ index = { index } and
76+ departement = '{ departement } '
77+ """
5178
5279 return (
5380 Container ()
5481 .sql_to_geojsonseq_on_s3_handler ()
5582 .export_sql_result_to_geojsonseq_on_s3 (
56- sql = f"SELECT * FROM public_ocsge.occupation_du_sol WHERE year = { year } and departement = ' { departement } '" , # noqa: E501
83+ sql = multiline_string_to_single_line ( sql ) , # noqa: E501
5784 s3_key = f"{ vector_tiles_dir } /{ filename } " ,
5885 s3_bucket = bucket_name ,
5986 )
6087 )
6188
6289 @task .bash (skip_on_exit_code = 110 , trigger_rule = "none_skipped" )
6390 def geojson_to_pmtiles (params : dict ):
64- year = params .get ("year " )
91+ index = params .get ("index " )
6592 departement = params .get ("departement" )
66- geojson_filename = get_geojson_filename (year , departement )
67- pmtiles_filename = get_pmtiles_filename (year , departement )
93+ geojson_filename = get_geojson_filename (index , departement )
94+ pmtiles_filename = get_pmtiles_filename (index , departement )
6895 local_input = f"/tmp/{ geojson_filename } "
6996 local_output = f"/tmp/{ pmtiles_filename } "
7097 Container ().s3 ().get_file (f"{ bucket_name } /{ vector_tiles_dir } /{ geojson_filename } " , local_input )
@@ -87,25 +114,25 @@ def geojson_to_pmtiles(params: dict):
87114
88115 @task .python (trigger_rule = "none_skipped" )
89116 def upload (params : dict ):
90- year = params .get ("year " )
117+ index = params .get ("index " )
91118 departement = params .get ("departement" )
92- pmtiles_filename = get_pmtiles_filename (year , departement )
119+ pmtiles_filename = get_pmtiles_filename (index , departement )
93120 local_path = f"/tmp/{ pmtiles_filename } "
94121 path_on_s3 = f"{ bucket_name } /{ vector_tiles_dir } /{ pmtiles_filename } "
95122 Container ().s3 ().put (local_path , path_on_s3 )
96123
97124 @task .bash (trigger_rule = "none_skipped" )
98125 def delete_geojson_file (params : dict ):
99- year = params .get ("year " )
126+ index = params .get ("index " )
100127 departement = params .get ("departement" )
101- geojson_filename = get_geojson_filename (year , departement )
128+ geojson_filename = get_geojson_filename (index , departement )
102129 return f"rm /tmp/{ geojson_filename } "
103130
104131 @task .bash (trigger_rule = "none_skipped" )
105132 def delete_pmtiles_file (params : dict ):
106- year = params .get ("year " )
133+ index = params .get ("index " )
107134 departement = params .get ("departement" )
108- pmtiles_filename = get_pmtiles_filename (year , departement )
135+ pmtiles_filename = get_pmtiles_filename (index , departement )
109136 return f"rm /tmp/{ pmtiles_filename } "
110137
111138 (
0 commit comments