1- from logging import Logger
2- import pdb
3- from typing import Tuple
41
52import geopandas as gpd
63import pandas as pd
74from prefect import flow , get_run_logger , task
8- from pyproj import Geod
9- from sqlalchemy import bindparam , text
105
11- from src .db_config import create_engine
126from src .generic_tasks import extract , load
13- from src .processing import (
14- drop_duplicates_by_decreasing_priority ,
15- join_on_multiple_keys ,
16- left_isin_right_by_decreasing_priority ,
17- )
187
198
209@task
@@ -42,6 +31,21 @@ def validate_action(action: str) -> str:
4231 f"action must be one of { ', ' .join (valid_actions )} , got { action } "
4332 )
4433
34+ @task
35+ def join (
36+ previous_last_positions : pd .DataFrame ,
37+ last_positions : pd .DataFrame ,
38+ ) -> pd .DataFrame :
39+ last_positions = (
40+ pd .concat (
41+ [
42+ previous_last_positions ,
43+ last_positions ,
44+ ]
45+ )
46+ ).sort_values ("ts" , ascending = False ).groupby ("mmsi" ).head (1 )
47+
48+ return last_positions
4549
4650@task
4751def extract_last_positions (minutes : int ) -> gpd .GeoDataFrame :
@@ -61,119 +65,6 @@ def extract_last_positions(minutes: int) -> gpd.GeoDataFrame:
6165 geom_col = "coord"
6266 )
6367
64- @task
65- def extract_latest_vessels (mmsi : list [int ]) -> pd .DataFrame :
66- """
67- Extracts from the latest_vessels materialized view by mmsi
68-
69- Returns:
70- pd.DataFrame: DataFrame of vessels' last position.
71- """
72- query = text ("""
73- SELECT
74- ship_id AS vessel_id,
75- CASE WHEN mmsi_number IS NOT NULL THEN mmsi_number::integer ELSE NULL END as mmsi
76- FROM latest_vessels
77- WHERE mmsi_number IN :mmsi
78- """ ).bindparams (
79- bindparam ("mmsi" , expanding = True ))
80-
81- engine = create_engine (db = "monitorenv_remote" )
82- return pd .read_sql (query , engine , params = {"mmsi" : [str (x ) for x in mmsi ]})
83-
84- @task
85- def drop_duplicates (positions : gpd .GeoDataFrame ) -> gpd .GeoDataFrame :
86- """
87- Drop duplicate vessels in a `pandas.DataFrame` of positions, keeping only the most
88- recent position of each vessel.
89-
90- This is required although the query that computes last positions already contains a
91- DISTINCT ON clause because for some vessels, we receive each position twice with
92- partially different identifiers - for instance, the same CFR but different ircs or
93- external immatriculation.
94-
95- De-deplucation is done using, by decreasing priority, vessel_id and mmsi
96-
97- Args:
98- positions (gpd.GeoDataFrame): positions of vessels. Must contain columns "vessel_id", "mmsi", "ts".
99-
100- Returns:
101- gpd.GeoDataFrame: GeoDataFrame of vessels' last position with duplicates removed.
102- """
103- return drop_duplicates_by_decreasing_priority (
104- positions .sort_values (by = "ts" , ascending = False ),
105- subset = ["vessel_id" , "mmsi" ],
106- )
107-
108- @task
109- def add_vessel_id (last_positions : gpd .GeoDataFrame ) -> pd .DataFrame :
110- """
111- Adds a `vessel_id` column to the input `DataFrame` by:
112-
113- - querying all vessels in the `vessels` table that have a matching `mmsi`
114- - matching the found vessels to the input vessels using the `merge_vessel_id`
115- helper.
116-
117- Args:
118- last_positions (pd.DataFrame): DataFrame of last_positions. Must have columns `mmsi`
119-
120- Returns:
121- pd.DataFrame: Same as input with an added `vessel_id` column.
122- """
123- logger = get_run_logger ()
124-
125- if "vessel_id" in last_positions :
126- logger .warning (
127- (
128- "Column `vessel_id` already present in input DataFrame, "
129- "returning unmodified input."
130- )
131- )
132- return last_positions
133-
134- found_vessels = extract_latest_vessels (last_positions .mmsi .dropna ().drop_duplicates ().to_list ())
135-
136- last_positions_with_vessel_id = merge_vessel_id (last_positions , found_vessels , logger )
137-
138- return last_positions_with_vessel_id
139-
140-
141-
142-
143- def merge_vessel_id (
144- last_positions : gpd .GeoDataFrame , found_vessels : pd .DataFrame , logger : Logger
145- ) -> gpd .GeoDataFrame :
146- """
147- The two input DataFrames are assumed to be:
148-
149- - a list of last_positions with `mmsi` identifiers
150- (plus potential other columns) without a `vessel_id` column
151- - a list of vessels with `mmsi` and
152- `vessel_id` columns (and no other columns). Typically these are the vessels
153- that are found in the `latest_vessels` table that match the identifiers of the
154- `vessels`
155-
156- The idea is to add the `vessel_id` from the second DataFrame as a new column in the
157- first DataFrame, by matching the right lines in both DataFrame.
158-
159- This is done by perfoming a left join of the input DataFrames using
160- join left on mmsi.
161-
162- The result always has exactly the same lines as the first input DataFrame.
163-
164- Args:
165- last_positions (gpd.GeoDataFrame): Vessels to match to a found_vessel
166- found_vessels (pd.DataFrame): found_vessels to match to a vessel
167- logger (Logger): Logger instance
168-
169- Returns:
170- gpd.GeoDataFrame: Same as vessels with an added `vessel_id` column.
171- """
172- last_positions = last_positions .copy ()
173- merged = last_positions .merge (found_vessels [['mmsi' , 'vessel_id' ]], how = 'left' , on = 'mmsi' )
174- return merged .reset_index (drop = True )
175-
176-
17768@task
17869def extract_previous_last_positions () -> gpd .GeoDataFrame :
17970 """
@@ -191,119 +82,6 @@ def extract_previous_last_positions() -> gpd.GeoDataFrame:
19182 geom_col = "coord"
19283 )
19384
194-
195- @task
196- def drop_unchanged_new_last_positions (
197- new_last_positions : pd .DataFrame , previous_last_positions : pd .DataFrame
198- ) -> pd .DataFrame :
199- """
200- Filters all positions of new_last_positions that are present in
201- previous_last_positions.
202-
203- Args:
204- previous_last_positions (pd.DataFrame)
205- new_last_positions (pd.DataFrame)
206-
207- Returns:
208- pd.DataFrame: subset of new_last_positions
209- """
210- return new_last_positions [
211- ~ new_last_positions .id .isin (set (previous_last_positions .id ))
212- ].copy (deep = True )
213-
214-
215- @task
216- def split (
217- previous_last_positions : gpd .GeoDataFrame , new_last_positions : gpd .GeoDataFrame
218- ) -> Tuple [gpd .GeoDataFrame , gpd .GeoDataFrame , gpd .GeoDataFrame ]:
219- """
220- Splits vessels into 3 categories:
221-
222- - The ones that are in previous_last_positions only (known vessels that haven't
223- moved)
224- - The ones that are in new_last_positions only (new vessels never seen before)
225- - The ones in both datasets (known vessels that have moved and whose position must
226- be updated)
227-
228- Returns the last_positions data of these 3 sets of vessels separately in 3
229- DataFrames. For vessels whose position must be updated, the returned DataFrame
230- contains the data of both the previous and the new last_position, in order to make
231- it possible to computed some metrics (i.e. the emission period).
232-
233- Args:
234- previous_last_positions (gpd.GeoDataFrame)
235- new_last_positions (gpd.GeoDataFrame)
236-
237- Returns:
238- Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame, gpd.GeoDataFrame]:
239- - unchanged_previous_last_positions
240- - new_vessels_last_positions
241- - last_positions_to_update
242- """
243-
244- previous_last_positions = previous_last_positions .copy (deep = True )
245- new_last_positions = new_last_positions .copy (deep = True )
246-
247- vessel_id_cols = ["vessel_id" , "mmsi" ]
248-
249- unchanged_previous_last_positions = previous_last_positions [
250- ~ left_isin_right_by_decreasing_priority (
251- previous_last_positions [vessel_id_cols ], new_last_positions [vessel_id_cols ]
252- )
253- ]
254-
255- new_vessels_last_positions = new_last_positions [
256- ~ left_isin_right_by_decreasing_priority (
257- new_last_positions [vessel_id_cols ], previous_last_positions [vessel_id_cols ]
258- )
259- ]
260-
261- last_positions_to_update = join_on_multiple_keys (new_last_positions ,
262- previous_last_positions [vessel_id_cols + ["ts" ]],
263- or_join_keys = vessel_id_cols ,
264- how = "inner" ,
265- coalesce_common_columns = False ,
266- )
267-
268- return (
269- unchanged_previous_last_positions ,
270- new_vessels_last_positions ,
271- last_positions_to_update ,
272- )
273-
274-
275- @task
276- def concatenate (
277- unchanged_previous_last_positions : gpd .GeoDataFrame ,
278- new_vessels_last_positions : gpd .GeoDataFrame ,
279- updated_last_positions : gpd .GeoDataFrame ,
280- ) -> gpd .GeoDataFrame :
281- """
282- Concatenates the 3 sets of last_positions and reindexes the rows from 1 to n.
283-
284- Args:
285- unchanged_previous_last_positions (pd.DataFrame)
286- new_vessels_last_positions (pd.DataFrame)
287- updated_last_positions (pd.DataFrame)
288-
289- Returns:
290- pd.DataFrame: concatenation of the 3 inputs sets of last_positions
291- """
292-
293- last_positions = (
294- pd .concat (
295- [
296- unchanged_previous_last_positions ,
297- new_vessels_last_positions ,
298- updated_last_positions ,
299- ]
300- )
301- .reset_index ()
302- .drop (columns = ["index" ])
303- )
304-
305- return last_positions
306-
30785@task
30886def load_last_positions (last_positions ):
30987 load (
@@ -321,29 +99,10 @@ def load_last_positions(last_positions):
32199def last_positions_flow (minutes : int = 5 , action : str = "update" ,):
322100 action = validate_action (action )
323101 last_positions = extract_last_positions (minutes = minutes )
324- last_positions = add_vessel_id (last_positions )
325- last_positions = drop_duplicates (last_positions )
326102
327103 if action == "update" :
328104 previous_last_positions = extract_previous_last_positions .submit ()
329- previous_last_positions = drop_duplicates (previous_last_positions )
330- new_last_positions = drop_unchanged_new_last_positions (
331- last_positions , previous_last_positions
332- )
333-
334- (
335- unchanged_previous_last_positions ,
336- new_vessels_last_positions ,
337- last_positions_to_update ,
338- ) = split (previous_last_positions , new_last_positions )
339-
340- last_positions = concatenate (
341- unchanged_previous_last_positions ,
342- new_vessels_last_positions ,
343- last_positions_to_update ,
344- )
345-
346- last_positions = drop_duplicates (last_positions )
105+ last_positions = join (previous_last_positions , last_positions )
347106
348107 # Load
349108 load_last_positions (last_positions )
0 commit comments