Skip to content

Commit 6a5e4eb

Browse files
feat: change heading, course and speed type. drop useless tasks
1 parent 9939928 commit 6a5e4eb

File tree

7 files changed

+55
-146
lines changed

7 files changed

+55
-146
lines changed

backend/src/main/resources/db/migration/internal/V0.200__create_last_position_table.sql

Lines changed: 0 additions & 12 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
CREATE TABLE public.last_positions
2+
(
3+
id SERIAL PRIMARY KEY,
4+
mmsi INT UNIQUE,
5+
vessel_id INT,
6+
coord GEOMETRY,
7+
status VARCHAR,
8+
course SMALLINT,
9+
heading SMALLINT,
10+
speed SMALLINT,
11+
ts TIMESTAMPTZ
12+
);

pipeline/src/flows/last_positions.py

Lines changed: 1 addition & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,6 @@ def extract_last_positions(minutes: int) -> gpd.GeoDataFrame:
6161
geom_col="coord"
6262
)
6363

64-
@task
65-
def extract_latest_vessels(mmsi: list[int]) -> pd.DataFrame:
66-
"""
67-
Extracts from the latest_vessels materialized view by mmsi
68-
69-
Returns:
70-
pd.DataFrame: DataFrame of vessels' last position.
71-
"""
72-
query = text("""
73-
SELECT
74-
ship_id AS vessel_id,
75-
CASE WHEN mmsi_number IS NOT NULL THEN mmsi_number::integer ELSE NULL END as mmsi
76-
FROM latest_vessels
77-
WHERE mmsi_number IN :mmsi
78-
""").bindparams(
79-
bindparam("mmsi", expanding=True))
80-
81-
engine = create_engine(db="monitorenv_remote")
82-
return pd.read_sql(query, engine, params={"mmsi": [str(x) for x in mmsi]})
83-
8464
@task
8565
def drop_duplicates(positions: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
8666
"""
@@ -105,75 +85,6 @@ def drop_duplicates(positions: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
10585
subset=["vessel_id", "mmsi"],
10686
)
10787

108-
@task
109-
def add_vessel_id(last_positions: gpd.GeoDataFrame) -> pd.DataFrame:
110-
"""
111-
Adds a `vessel_id` column to the input `DataFrame` by:
112-
113-
- querying all vessels in the `vessels` table that have a matching `mmsi`
114-
- matching the found vessels to the input vessels using the `merge_vessel_id`
115-
helper.
116-
117-
Args:
118-
last_positions (pd.DataFrame): DataFrame of last_positions. Must have columns `mmsi`
119-
120-
Returns:
121-
pd.DataFrame: Same as input with an added `vessel_id` column.
122-
"""
123-
logger = get_run_logger()
124-
125-
if "vessel_id" in last_positions:
126-
logger.warning(
127-
(
128-
"Column `vessel_id` already present in input DataFrame, "
129-
"returning unmodified input."
130-
)
131-
)
132-
return last_positions
133-
134-
found_vessels = extract_latest_vessels(last_positions.mmsi.dropna().drop_duplicates().to_list())
135-
136-
last_positions_with_vessel_id = merge_vessel_id(last_positions, found_vessels, logger)
137-
138-
return last_positions_with_vessel_id
139-
140-
141-
142-
143-
def merge_vessel_id(
144-
last_positions: gpd.GeoDataFrame, found_vessels: pd.DataFrame, logger: Logger
145-
) -> gpd.GeoDataFrame:
146-
"""
147-
The two input DataFrames are assumed to be:
148-
149-
- a list of last_positions with `mmsi` identifiers
150-
(plus potential other columns) without a `vessel_id` column
151-
- a list of vessels with `mmsi` and
152-
`vessel_id` columns (and no other columns). Typically these are the vessels
153-
that are found in the `latest_vessels` table that match the identifiers of the
154-
`vessels`
155-
156-
The idea is to add the `vessel_id` from the second DataFrame as a new column in the
157-
first DataFrame, by matching the right lines in both DataFrame.
158-
159-
This is done by perfoming a left join of the input DataFrames using
160-
join left on mmsi.
161-
162-
The result always has exactly the same lines as the first input DataFrame.
163-
164-
Args:
165-
last_positions (gpd.GeoDataFrame): Vessels to match to a found_vessel
166-
found_vessels (pd.DataFrame): found_vessels to match to a vessel
167-
logger (Logger): Logger instance
168-
169-
Returns:
170-
gpd.GeoDataFrame: Same as vessels with an added `vessel_id` column.
171-
"""
172-
last_positions = last_positions.copy()
173-
merged = last_positions.merge(found_vessels[['mmsi', 'vessel_id']], how='left', on='mmsi')
174-
return merged.reset_index(drop=True)
175-
176-
17788
@task
17889
def extract_previous_last_positions() -> gpd.GeoDataFrame:
17990
"""
@@ -321,15 +232,10 @@ def load_last_positions(last_positions):
321232
def last_positions_flow(minutes: int = 5, action: str = "update",):
322233
action = validate_action(action)
323234
last_positions = extract_last_positions(minutes=minutes)
324-
last_positions = add_vessel_id(last_positions)
325-
last_positions = drop_duplicates(last_positions)
326235

327236
if action == "update":
328237
previous_last_positions = extract_previous_last_positions.submit()
329-
previous_last_positions = drop_duplicates(previous_last_positions)
330-
new_last_positions = drop_unchanged_new_last_positions(
331-
last_positions, previous_last_positions
332-
)
238+
new_last_positions = drop_unchanged_new_last_positions(last_positions, previous_last_positions)
333239

334240
(
335241
unchanged_previous_last_positions,

pipeline/src/queries/monitorenv/compute_last_positions.sql

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ last_positions AS (
2626
SELECT
2727
-- The DISTINCT ON clause is required to remove possible duplicates due to vessels
2828
-- for which we receive each position multiple times
29-
DISTINCT ON (mmsi)
30-
id,
31-
mmsi,
32-
coord,
33-
status,
34-
course,
35-
heading,
36-
speed,
37-
ts
38-
FROM last_positions
29+
DISTINCT ON (lp.mmsi)
30+
lp.id,
31+
lv.ship_id as vessel_id,
32+
lp.mmsi,
33+
lp.coord,
34+
lp.status,
35+
lp.course,
36+
lp.heading,
37+
lp.speed,
38+
lp.ts
39+
FROM last_positions lp
40+
LEFT JOIN latest_vessels lv
41+
ON lp.mmsi = lv.mmsi_number::integer;
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
TRUNCATE public.ais_positions;
22

3-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (545437273, 'POINT (-29.01623785635482 -47.158559321739695)', '5f0f7dce-00a0-454d-9efe-9179ffe3446c', 0.6666306544602336, 0.8632657225222369, 0.9542721422473159, NOW() - INTERVAL '10 minutes');
4-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'f34ca743-5f38-4ac0-81bd-42ca3c143a75', 0.5595555648389977, 0.425865948092895, 0.05330508237507581, NOW() - INTERVAL '10 minutes');
5-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (92030123, 'POINT (11.05964276248551 2.8143566255156713)', '793c063b-6eb0-4e68-bd89-4b3f1b7660ae', 0.522566258271548, 0.14769292694604108, 0.375027306654942, NOW() - INTERVAL '30 minutes');
6-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (883168729, 'POINT (-104.19945639555388 -36.31227256604617)', '99753082-125d-4b21-b32c-8fe3f2c0dac8', 0.6547152904430241, 0.4734219663496455, 0.10452387237140237, NOW() - INTERVAL '30 minutes');
7-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'f05d7df1-8ba1-4c8c-8a24-72f6933bef78', 0.8090578643234771, 0.9890203385354814, 0.6750665751075512, NOW() - INTERVAL '1 day');
8-
INSERT INTO public.ais_positions (mmsi, coord, status, course, heading, speed, ts) VALUES (598693403, 'POINT (78.18423409817728 40.52294043382852)', '9d9c1921-68ac-4481-bfa6-6efc00fcab23', 0.05954173427526066, 0.508703514754805, 0.9486036414809003, '2020-12-01 00:00:00 +00:00');
3+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (1, 545437273, 'POINT (-29.01623785635482 -47.158559321739695)', 'STATUT_1', 6666, 8632, 9542, NOW() - INTERVAL '10 minutes');
4+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (2, 755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'STATUT_2', 5595, 4258, 533, NOW() - INTERVAL '10 minutes');
5+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (3, 92030123, 'POINT (11.05964276248551 2.8143566255156713)', 'STATUT_3', 5225, 1476, 3750, NOW() - INTERVAL '30 minutes');
6+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (4, 883168729, 'POINT (-104.19945639555388 -36.31227256604617)', 'STATUT_4', 6547, 4734, 1045, NOW() - INTERVAL '30 minutes');
7+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (5, 851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'STATUT_5', 8090, 9890, 6750, NOW() - INTERVAL '1 day');
8+
INSERT INTO public.ais_positions (id, mmsi, coord, status, course, heading, speed, ts) VALUES (6, 598693403, 'POINT (78.18423409817728 40.52294043382852)', 'STATUT_6', 5954, 5087, 9486, '2020-12-01 00:00:00 +00:00');
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
TRUNCATE public.last_positions;
22

3-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (1, 545437273, 'POINT (-29.01623785635482 -47.158559321739695)', '5f0f7dce-00a0-454d-9efe-9179ffe3446c', 0.6666306544602336, 0.8632657225222369, 0.9542721422473159, '2025-12-11 09:13:22.628718 +00:00');
4-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (2, 755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'f34ca743-5f38-4ac0-81bd-42ca3c143a75', 0.5595555648389977, 0.425865948092895, 0.05330508237507581, '2025-12-11 09:13:37.640976 +00:00');
5-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (3, 92030123, 'POINT (11.05964276248551 2.8143566255156713)', '793c063b-6eb0-4e68-bd89-4b3f1b7660ae', 0.522566258271548, 0.14769292694604108, 0.375027306654942, '2025-12-11 09:13:52.632506 +00:00');
6-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (4, 883168729, 'POINT (-104.19945639555388 -36.31227256604617)', '99753082-125d-4b21-b32c-8fe3f2c0dac8', 0.6547152904430241, 0.4734219663496455, 0.10452387237140237, '2025-12-11 09:14:07.629067 +00:00');
7-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (5, 851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'f05d7df1-8ba1-4c8c-8a24-72f6933bef78', 0.8090578643234771, 0.9890203385354814, 0.6750665751075512, '2025-12-11 09:14:22.626099 +00:00');
8-
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (6, 598693403, 'POINT (78.18423409817728 40.52294043382852)', '9d9c1921-68ac-4481-bfa6-6efc00fcab23', 0.05954173427526066, 0.508703514754805, 0.9486036414809003, '2025-12-11 09:14:37.627802 +00:00');
3+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (1, 545437273, 'POINT (-29.01623785635482 -47.158559321739695)', 'STATUS_1', 6666, 8632, 9542, '2025-12-11 09:13:22.628718 +00:00');
4+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (2, 755136766, 'POINT (-129.56689057250617 53.65961394926839)', 'STATUS_2', 5595, 4258, 533, '2025-12-11 09:13:37.640976 +00:00');
5+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (3, 92030123, 'POINT (11.05964276248551 2.8143566255156713)', 'STATUS_3', 5225, 1476, 3750, '2025-12-11 09:13:52.632506 +00:00');
6+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (4, 883168729, 'POINT (-104.19945639555388 -36.31227256604617)', 'STATUS_4', 6547, 4734, 1045, '2025-12-11 09:14:07.629067 +00:00');
7+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (5, 851385830, 'POINT (168.45242103624486 -44.45202385317484)', 'STATUS_5', 8090, 9890, 6750, '2025-12-11 09:14:22.626099 +00:00');
8+
INSERT INTO public.last_positions (vessel_id, mmsi, coord, status, course, heading, speed, ts) VALUES (6, 598693403, 'POINT (78.18423409817728 40.52294043382852)', 'STATUS_6', 5954, 5087, 9486, '2025-12-11 09:14:37.627802 +00:00');

pipeline/tests/test_flows/test_last_positions.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ def test_extract_previous_last_positions(reset_test_data):
2525

2626
def test_extract_last_positions(reset_test_data):
2727
last_positions = extract_last_positions(minutes=15)
28-
assert last_positions.shape == (2, 8)
28+
assert last_positions.shape == (2, 9)
2929

3030
last_positions = extract_last_positions(minutes=35)
31-
assert last_positions.shape == (4, 8)
31+
assert last_positions.shape == (4, 9)
3232

3333

3434

@@ -39,9 +39,9 @@ def test_load_last_positions(reset_test_data):
3939
"vessel_id": [1, None],
4040
"mmsi": [123456789, 987654321],
4141
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.7335 47.6078)"],
42-
"speed": [1.0, 0.0],
43-
"course": [302.0, 0.0],
44-
"heading": [1.0, 0.0],
42+
"speed": [1, 0],
43+
"course": [302, 0],
44+
"heading": [1, 0],
4545
"ts": [
4646
datetime(2021, 12, 5, 11, 52, 32),
4747
datetime(2018, 12, 5, 11, 52, 32),
@@ -65,9 +65,9 @@ def test_drop_duplicates():
6565
"vessel_id": [1, 1, 2],
6666
"mmsi": [123456789, 123456789, 987654321],
6767
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.8001 47.6078)", "POINT(-2.7335 47.6078)"],
68-
"speed": [1.0, 2.0, 0.0],
69-
"course": [302.0, 100.0, 0.0],
70-
"heading": [1.0, 0.0, 0.0],
68+
"speed": [1, 2, 0],
69+
"course": [302, 100, 0],
70+
"heading": [1, 0, 0],
7171
"ts": [
7272
datetime(2021, 12, 5, 11, 52, 32),
7373
datetime(2021, 12, 5, 12, 52, 32),
@@ -125,9 +125,9 @@ def test_split():
125125
"vessel_id": [1, 2, 3],
126126
"mmsi": [123456789, 123456788, 987654321],
127127
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.8001 47.6078)", "POINT(-2.7335 47.6078)"],
128-
"speed": [1.0, 2.0, 0.0],
129-
"course": [302.0, 100.0, 0.0],
130-
"heading": [1.0, 0.0, 0.0],
128+
"speed": [1, 2, 0],
129+
"course": [302, 100, 0],
130+
"heading": [1, 0, 0],
131131
"ts": [
132132
datetime(2021, 12, 5, 11, 52, 32),
133133
datetime(2021, 12, 5, 11, 52, 32),
@@ -141,9 +141,9 @@ def test_split():
141141
"vessel_id": [1, 2, None, 12],
142142
"mmsi": [123456789, 123456788, 222222222, 333333333],
143143
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.8001 47.6078)", "POINT(-2.800 47.6078)", "POINT(-2.7336 47.6078)"],
144-
"speed": [1.0, 2.0, 2.0, 0.0],
145-
"course": [302.0, 100.0, 100.0, 0.0],
146-
"heading": [1.0, 0.0, 0.0, 0.0],
144+
"speed": [1, 2, 2, 0],
145+
"course": [302, 100, 100, 0],
146+
"heading": [1, 0, 0, 0],
147147
"ts": [
148148
datetime(2021, 12, 5, 12, 52, 32),
149149
datetime(2021, 12, 5, 12, 52, 32),
@@ -164,9 +164,9 @@ def test_split():
164164
"vessel_id": [1, 2],
165165
"mmsi": [123456789, 123456788],
166166
"coord": ["POINT(-2.7335 47.6078)", "POINT(-2.8001 47.6078)"],
167-
"speed": [1.0, 2.0],
168-
"course": [302.0, 100.0],
169-
"heading": [1.0, 0.0],
167+
"speed": [1, 2],
168+
"course": [302, 100],
169+
"heading": [1, 0],
170170
"ts": [
171171
datetime(2021, 12, 5, 12, 52, 32),
172172
datetime(2021, 12, 5, 12, 52, 32)

0 commit comments

Comments
 (0)