Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions forklift/forklift/pipeline/flows/rapportnav_historical.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from pathlib import Path

import prefect
from prefect import Flow, case, unmapped

from forklift.pipeline.shared_tasks import rapportnav
from forklift.pipeline.shared_tasks.control_flow import check_flow_not_running
from forklift.pipeline.shared_tasks.generic import (
create_database_if_not_exists,
drop_table_if_exists,
load_df_to_data_warehouse,
run_ddl_scripts,
)

with Flow("RapportNavHistorical") as flow:
logger = prefect.context.get("logger")
report_types = ["aem"]

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
mission_ids = rapportnav.extract_missions_ids()

# Chunk mission ids at runtime using a Prefect task so we can map over batches
mission_ids_batches = rapportnav.chunk_missions(mission_ids, 100)

for report_type in report_types:
# Map fetch_rapportnav_api over the batches produced by chunk_missions
df_batch = rapportnav.fetch_rapportnav_api.map(
report_type=unmapped(report_type), missions_ids=mission_ids_batches
)

# Concatenate mapped DataFrames at runtime
# If dataframe is empty, stopping the flow here
df = rapportnav.concat_dfs(df_batch)

destination_database = "rapportnav"
create_database = create_database_if_not_exists("rapportnav")

drop_table = drop_table_if_exists(
destination_database,
report_type,
upstream_tasks=[create_database],
)
created_table = run_ddl_scripts(
f"rapportnav/create_{report_type}_if_not_exists.sql",
database=destination_database,
table=report_type,
upstream_tasks=[drop_table],
)

loaded_df = load_df_to_data_warehouse(
df,
destination_database=destination_database,
destination_table=report_type,
upstream_tasks=[created_table],
)


flow.file_name = Path(__file__).name
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import datetime
import re
from pathlib import Path

import pandas as pd
import prefect
import requests
from prefect import Flow, case, task, unmapped
from prefect import task
from prefect.engine.signals import SKIP
from prefect.engine.state import Failed
from prefect.triggers import all_finished
from unidecode import unidecode

from forklift.config import RAPPORTNAV_API_ENDPOINT, RAPPORTNAV_API_KEY
from forklift.pipeline.helpers.generic import extract
from forklift.pipeline.shared_tasks.control_flow import check_flow_not_running
from forklift.pipeline.shared_tasks.generic import (
create_database_if_not_exists,
drop_table_if_exists,
load_df_to_data_warehouse,
run_ddl_scripts,
)

col_patrol = [
"id",
Expand Down Expand Up @@ -138,6 +129,8 @@ def _extract_control_unit_ids(x):


def _process_data(df: pd.DataFrame, report_type: str) -> pd.DataFrame:
logger = prefect.context.get("logger")

if not df.empty:
# Normalize column names using the shared cleaning function
df.columns = [_clean_str(c, lower=False) for c in df.columns]
Expand Down Expand Up @@ -265,6 +258,8 @@ def concat_dfs(dfs: list) -> pd.DataFrame:
"""
Concatenate a list of DataFrames inside the flow runtime.
"""
logger = prefect.context.get("logger")

if isinstance(dfs, Failed):
logger.error(
"Aucune tâche fetch_rapportnav_api n a fonctionné. Aucune donnéee disponible"
Expand All @@ -281,17 +276,18 @@ def concat_dfs(dfs: list) -> pd.DataFrame:
def extract_missions_ids() -> list:
logger = prefect.context.get("logger")

mission_ids = extract(
db_name="monitorenv_remote",
query_filepath="monitorenv_remote/missions.sql",
parse_dates=["start_datetime_utc"],
)
# mission_ids = extract(
# db_name="monitorenv_remote",
# query_filepath="monitorenv_remote/missions.sql",
# parse_dates=["start_datetime_utc"],
# )
mission_ids = pd.read_csv("sandbox/mission_id.csv", sep=";")[:110]

logger.info((f"Found {len(mission_ids)} missions. "))
return list(mission_ids.id)


@task(checkpoint=False, max_retries=4, retry_delay=datetime.timedelta(seconds=10))
@task(checkpoint=True, max_retries=4, retry_delay=datetime.timedelta(seconds=10))
def fetch_rapportnav_api(report_type: str, missions_ids: list):
"""Fetch results from a RapportNav API and returns it as a DataFrame.

Expand Down Expand Up @@ -332,50 +328,3 @@ def fetch_rapportnav_api(report_type: str, missions_ids: list):
if n_rows:
df = _process_data(df, report_type)
return df


with Flow("RapportNavAnalytics") as flow:
logger = prefect.context.get("logger")
report_types = ["aem"]

flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
mission_ids = extract_missions_ids()

# Chunk mission ids at runtime using a Prefect task so we can map over batches
mission_ids_batches = chunk_missions(mission_ids, 100)

for report_type in report_types:
# Map fetch_rapportnav_api over the batches produced by chunk_missions
df_batch = fetch_rapportnav_api.map(
report_type=unmapped(report_type), missions_ids=mission_ids_batches
)

# Concatenate mapped DataFrames at runtime
# If dataframe is empty, stopping the flow here
df = concat_dfs(df_batch)

destination_database = "rapportnav"
create_database = create_database_if_not_exists("rapportnav")

drop_table = drop_table_if_exists(
destination_database,
report_type,
upstream_tasks=[create_database],
)
created_table = run_ddl_scripts(
f"rapportnav/create_{report_type}_if_not_exists.sql",
database=destination_database,
table=report_type,
upstream_tasks=[drop_table],
)

loaded_df = load_df_to_data_warehouse(
df,
destination_database=destination_database,
destination_table=report_type,
upstream_tasks=[created_table],
)


flow.file_name = Path(__file__).name
10 changes: 4 additions & 6 deletions forklift/tests/test_pipeline/test_flows/test_rapportnav.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import pandas as pd
import pandas.api.types as ptypes

from forklift.pipeline.flows.extract_rapportnav_analytics import (
from forklift.pipeline.flows.rapportnav_historical import flow
from forklift.pipeline.shared_tasks.rapportnav import (
_process_data,
extract_missions_ids,
flow,
)
from tests.mocks import replace_check_flow_not_running

Expand Down Expand Up @@ -86,9 +86,7 @@ def test__process_data_aem():


def test_extract_control_unit_ids():
from forklift.pipeline.flows.extract_rapportnav_analytics import (
_extract_control_unit_ids,
)
from forklift.pipeline.flows.rapportnav_historical import _extract_control_unit_ids

# None or empty -> empty list
assert _extract_control_unit_ids(None) == []
Expand Down Expand Up @@ -120,7 +118,7 @@ def test_extract_missions_ids():

def test_chunk_list():
"""Unit test for chunk_list helper used to batch mission ids."""
from forklift.pipeline.flows.extract_rapportnav_analytics import chunk_list
from forklift.pipeline.flows.rapportnav_historical import chunk_list

# Regular splitting
items = list(range(1, 11))
Expand Down
Loading