diff --git a/forklift/forklift/pipeline/flows/rapportnav_historical.py b/forklift/forklift/pipeline/flows/rapportnav_historical.py new file mode 100644 index 0000000..56cb69d --- /dev/null +++ b/forklift/forklift/pipeline/flows/rapportnav_historical.py @@ -0,0 +1,59 @@ +from pathlib import Path + +import prefect +from prefect import Flow, case, unmapped + +from forklift.pipeline.shared_tasks import rapportnav +from forklift.pipeline.shared_tasks.control_flow import check_flow_not_running +from forklift.pipeline.shared_tasks.generic import ( + create_database_if_not_exists, + drop_table_if_exists, + load_df_to_data_warehouse, + run_ddl_scripts, +) + +with Flow("RapportNavHistorical") as flow: + logger = prefect.context.get("logger") + report_types = ["aem"] + + flow_not_running = check_flow_not_running() + with case(flow_not_running, True): + mission_ids = rapportnav.extract_missions_ids() + + # Chunk mission ids at runtime using a Prefect task so we can map over batches + mission_ids_batches = rapportnav.chunk_missions(mission_ids, 100) + + for report_type in report_types: + # Map fetch_rapportnav_api over the batches produced by chunk_missions + df_batch = rapportnav.fetch_rapportnav_api.map( + report_type=unmapped(report_type), missions_ids=mission_ids_batches + ) + + # Concatenate mapped DataFrames at runtime + # If dataframe is empty, stopping the flow here + df = rapportnav.concat_dfs(df_batch) + + destination_database = "rapportnav" + create_database = create_database_if_not_exists("rapportnav") + + drop_table = drop_table_if_exists( + destination_database, + report_type, + upstream_tasks=[create_database], + ) + created_table = run_ddl_scripts( + f"rapportnav/create_{report_type}_if_not_exists.sql", + database=destination_database, + table=report_type, + upstream_tasks=[drop_table], + ) + + loaded_df = load_df_to_data_warehouse( + df, + destination_database=destination_database, + destination_table=report_type, + upstream_tasks=[created_table], + ) + + +flow.file_name = Path(__file__).name diff --git a/forklift/forklift/pipeline/flows/extract_rapportnav_analytics.py b/forklift/forklift/pipeline/shared_tasks/rapportnav.py similarity index 82% rename from forklift/forklift/pipeline/flows/extract_rapportnav_analytics.py rename to forklift/forklift/pipeline/shared_tasks/rapportnav.py index 412a08f..a57ccff 100644 --- a/forklift/forklift/pipeline/flows/extract_rapportnav_analytics.py +++ b/forklift/forklift/pipeline/shared_tasks/rapportnav.py @@ -1,25 +1,16 @@ import datetime import re -from pathlib import Path import pandas as pd import prefect import requests -from prefect import Flow, case, task, unmapped +from prefect import task from prefect.engine.signals import SKIP from prefect.engine.state import Failed from prefect.triggers import all_finished from unidecode import unidecode from forklift.config import RAPPORTNAV_API_ENDPOINT, RAPPORTNAV_API_KEY -from forklift.pipeline.helpers.generic import extract -from forklift.pipeline.shared_tasks.control_flow import check_flow_not_running -from forklift.pipeline.shared_tasks.generic import ( - create_database_if_not_exists, - drop_table_if_exists, - load_df_to_data_warehouse, - run_ddl_scripts, -) col_patrol = [ "id", @@ -138,6 +129,8 @@ def _extract_control_unit_ids(x): def _process_data(df: pd.DataFrame, report_type: str) -> pd.DataFrame: + logger = prefect.context.get("logger") + if not df.empty: # Normalize column names using the shared cleaning function df.columns = [_clean_str(c, lower=False) for c in df.columns] @@ -265,6 +258,8 @@ def concat_dfs(dfs: list) -> pd.DataFrame: """ Concatenate a list of DataFrames inside the flow runtime. """ + logger = prefect.context.get("logger") + if isinstance(dfs, Failed): logger.error( "Aucune tâche fetch_rapportnav_api n a fonctionné. Aucune donnéee disponible" @@ -281,17 +276,18 @@ def concat_dfs(dfs: list) -> pd.DataFrame: def extract_missions_ids() -> list: logger = prefect.context.get("logger") - mission_ids = extract( - db_name="monitorenv_remote", - query_filepath="monitorenv_remote/missions.sql", - parse_dates=["start_datetime_utc"], - ) + # mission_ids = extract( + # db_name="monitorenv_remote", + # query_filepath="monitorenv_remote/missions.sql", + # parse_dates=["start_datetime_utc"], + # ) + mission_ids = pd.read_csv("sandbox/mission_id.csv", sep=";")[:110] logger.info((f"Found {len(mission_ids)} missions. ")) return list(mission_ids.id) -@task(checkpoint=False, max_retries=4, retry_delay=datetime.timedelta(seconds=10)) +@task(checkpoint=True, max_retries=4, retry_delay=datetime.timedelta(seconds=10)) def fetch_rapportnav_api(report_type: str, missions_ids: list): """Fetch results from a RapportNav API and returns it as a DataFrame. @@ -332,50 +328,3 @@ def fetch_rapportnav_api(report_type: str, missions_ids: list): if n_rows: df = _process_data(df, report_type) return df - - -with Flow("RapportNavAnalytics") as flow: - logger = prefect.context.get("logger") - report_types = ["aem"] - - flow_not_running = check_flow_not_running() - with case(flow_not_running, True): - mission_ids = extract_missions_ids() - - # Chunk mission ids at runtime using a Prefect task so we can map over batches - mission_ids_batches = chunk_missions(mission_ids, 100) - - for report_type in report_types: - # Map fetch_rapportnav_api over the batches produced by chunk_missions - df_batch = fetch_rapportnav_api.map( - report_type=unmapped(report_type), missions_ids=mission_ids_batches - ) - - # Concatenate mapped DataFrames at runtime - # If dataframe is empty, stopping the flow here - df = concat_dfs(df_batch) - - destination_database = "rapportnav" - create_database = create_database_if_not_exists("rapportnav") - - drop_table = drop_table_if_exists( - destination_database, - report_type, - upstream_tasks=[create_database], - ) - created_table = run_ddl_scripts( - f"rapportnav/create_{report_type}_if_not_exists.sql", - database=destination_database, - table=report_type, - upstream_tasks=[drop_table], - ) - - loaded_df = load_df_to_data_warehouse( - df, - destination_database=destination_database, - destination_table=report_type, - upstream_tasks=[created_table], - ) - - -flow.file_name = Path(__file__).name diff --git a/forklift/tests/test_pipeline/test_flows/test_rapportnav.py b/forklift/tests/test_pipeline/test_flows/test_rapportnav.py index cd2dec3..6a12fe9 100644 --- a/forklift/tests/test_pipeline/test_flows/test_rapportnav.py +++ b/forklift/tests/test_pipeline/test_flows/test_rapportnav.py @@ -1,10 +1,10 @@ import pandas as pd import pandas.api.types as ptypes -from forklift.pipeline.flows.extract_rapportnav_analytics import ( +from forklift.pipeline.flows.rapportnav_historical import flow +from forklift.pipeline.shared_tasks.rapportnav import ( _process_data, extract_missions_ids, - flow, ) from tests.mocks import replace_check_flow_not_running @@ -86,9 +86,7 @@ def test__process_data_aem(): def test_extract_control_unit_ids(): - from forklift.pipeline.flows.extract_rapportnav_analytics import ( - _extract_control_unit_ids, - ) + from forklift.pipeline.flows.rapportnav_historical import _extract_control_unit_ids # None or empty -> empty list assert _extract_control_unit_ids(None) == [] @@ -120,7 +118,7 @@ def test_extract_missions_ids(): def test_chunk_list(): """Unit test for chunk_list helper used to batch mission ids.""" - from forklift.pipeline.flows.extract_rapportnav_analytics import chunk_list + from forklift.pipeline.flows.rapportnav_historical import chunk_list # Regular splitting items = list(range(1, 11))