Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
*.DS_Store
**/.idea
.vscode
.dvc
__pycache__/
*.py[cod]
*$py.class
*.DS_Store
.ruff_cache/

# ROS2 builds
build
Expand All @@ -15,3 +17,9 @@ packages/navigation/test/data
packages/simulator_2d/test/data
packages/svg_debug_drawer/test/data/im*_out.svg
packages/lidar_map/data

# perception
perception/output*/
perception/run*/
perception/data
perception/.env
5 changes: 5 additions & 0 deletions perception/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
AWS_ACCESS_KEY_ID=access_key_id
AWS_SECRET_ACCESS_KEY=secret_access_key
CVAT_LOGIN=login
CVAT_PASSWORD=password
CVAT_HOST=host
107 changes: 107 additions & 0 deletions perception/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# 🚀 Data Preparation Pipeline for CVAT Annotation

## Project Structure

```tree
perception
├── data # папка с исходными данными
│ ├── run_name
│ │ ├── config.yaml
│ │ ├── run_name.mcap
│ │ ├── run_name.insv
│ │ ├── run_name-0.mp4
│ │ ├── run_name-90.mp4
│ │ ├── run_name-180.mp4
│ │ └── run_name-270.mp4
├── output_zip # папка с собранными данными для разметки
│ └── run_name.zip
├── cvat_api
│ └── get_annotations.py # получение разметки с CVAT
├── pcd_utils
│ └── mcap_to_pcd.py # конвертация mcap в pcd
├── s3_storage
│ └── zip_on_s3.py # загрузка архива на S3
├── frames_from_mp4.py # нарезка видео на кадры
├── pipeline.py # полный пайплайн
├── config.py # получение параметров конфигурации из config.yaml
└── privat_config.py # получение чувствительных данных из переменных окружения
└── .env
```

## Output Structure

```tree
run_name.zip
├── pointcloud
│ ├── run_name-000.pcd
...
├── related_images
│ ├── run_name-000_pcd
│ │ ├── run_name-000-0.jpg
│ │ ├── run_name-000-90.jpg
│ │ ├── run_name-000-180.jpg
│ │ └── run_name-000-270.jpg
...

```

## 🛠️ Module Overview

### `pcd_utils/mcap_to_pcd.py`
Converts `.mcap` files (ROS2 format) into `.pcd` point cloud frames based on a configured frequency. Supports time alignment with associated videos.

### `cvat_api/get_annotations.py`
Fetches annotation data from CVAT via its API using a specified `job_id`. Simplifies CVAT integration.

### `s3_storage/zip_on_s3.py`
Handles upload and download of ZIP archives to/from AWS S3, used for transferring datasets between environments or for CVAT access.

### `frames_from_mp4.py`
Extracts frames from each of the four perspective videos (`0`, `90`, `180`, `270` degrees) and organizes them for annotation.

### `pipeline.py`
End-to-end automation for preparing a dataset:
1. Extracts frames from the input videos.
2. Converts `.mcap` files to point cloud data.
3. Aligns point clouds and images in time.
4. Packages all data into a structured ZIP archive.
5. Archive can be uploaded to S3 for annotation workflows.

### Configuration

#### `privat_config.py`
Retrieves sensitive credentials from environment variables or `.env` file:

- `AWS_ACCESS_KEY_ID` — AWS S3 Access Key
- `AWS_SECRET_ACCESS_KEY` — AWS S3 Secret Key
- `CVAT_LOGIN` — CVAT username
- `CVAT_PASSWORD` — CVAT password
- `CVAT_HOST` — URL of the CVAT server

#### `config.yaml`

- `frequency`: Sampling rate for both video frames and point cloud extraction
- `mcap_elapsed`: Time offset (in seconds) to align MCAP data
- `video_elapsed`: Time offset (in seconds) to align video frames

---

## ⚡ Quick Start Guide

1. Install the required dependencies:
```bash
pip install -r requirements.txt
```
2. Configure your .env file and make sure config.yaml is properly filled.

3. Place the input files into the data/{run_name} directory:

- 4 video files: {run_name}-0.mp4, -90.mp4, -180.mp4, and -270.mp4
- {run_name}.mcap
- {run_name}.insv optionally

4. Run the pipeline:

```bash
python pipeline.py {run_name}
```
41 changes: 41 additions & 0 deletions perception/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
from typing import Tuple

import yaml


def get_settings(run_name: str) -> Tuple[float, float, int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Очень странная функция...

  1. Я бы использовал более современный pathlib для взаимодействия с путями и файлами
  2. Вылидацию корректности yaml конфига логично делать через pydantic модельку
  3. Возвращаемое значение тоже логично сделать моделькой, а не тюпликами. Можно сделать эту модельку более насыщенной и использовать как глобальный "контекст" выбранного рана. То есть во все функции, где нужны определенные данные из конфига, передавать весь объект контекста, а не отдельные чиселки

"""
Load configuration settings from a YAML file in the run directory.

:param run_name: Name of the run (used to locate the config folder).
:return: A tuple containing video_elapsed (float), mcap_elapsed (float),
and frequency (int) from the config file.
"""
base_dir = os.path.dirname(os.path.abspath(__file__))
# Path to the current .py file
folder_path = os.path.join(base_dir, "data", run_name)

if not os.path.exists(folder_path):
raise FileNotFoundError(f"Run folder {folder_path} not found")

config_path = os.path.join(folder_path, "config.yaml")

if os.path.exists(config_path):
with open(config_path, "r") as f:
config_data = yaml.safe_load(f) or {}
else:
raise FileNotFoundError(f"Config file {config_path} not found")

if (
"video_elapsed" not in config_data
or "mcap_elapsed" not in config_data
or "frequency" not in config_data
):
raise Exception("Config is incorrect")

return (
float(config_data["video_elapsed"]),
float(config_data["mcap_elapsed"]),
config_data["frequency"],
)
76 changes: 76 additions & 0 deletions perception/cvat_api/get_annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os

import requests
from private_config import private_settings

CVAT_API_URL = f"{private_settings.CVAT_HOST}/api"
OUTPUT_DIR = "output_annotations"


def get_auth_token(cvat_username: str, cvat_password: str):
"""
Получить токен авторизации в CVAT
:param cvat_username: Имя пользователя в CVAT
:param cvat_password: Пароль пользователя в CVAT
"""
response = requests.post(
f"{CVAT_API_URL}/auth/login",
json={"username": cvat_username, "password": cvat_password},
)

if response.status_code == 200:
return response.cookies
else:
raise Exception(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я бы не особо парился с докстрингами и красивыми ошибками (просто assert response.status_code == 200 можно делать), вряд ли этот код переиспользоваться будет, и по итогу основная точка входа это файл pipeline.py

f"Failed to get auth token: {response.status_code}, {response.text}"
)


def download_annotations(job_id: str, cookies) -> dict:
"""
Скачать разметку из CVAT по job_id
:param job_id: ID задачи на CVAT
:param cookies: Токен авторизации в CVAT
"""
response = requests.get(
f"{CVAT_API_URL}/jobs/{job_id}/annotations/",
cookies=cookies,
headers={"Accept": "application/vnd.cvat+json"},
)

if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Failed to download annotations: {response.status_code}, {response.text}"
)


def save_annotations(job_id: str, annotations: dict):
"""
Сохранить разметку в JSON-файл
:param job_id: ID задачи на CVAT
:param annotations: Разметка
"""
os.makedirs(OUTPUT_DIR, exist_ok=True)
file_path = os.path.join(OUTPUT_DIR, f"job_{job_id}_annotations.json")

with open(file_path, "w", encoding="utf-8") as f:
import json
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔


json.dump(annotations, f, ensure_ascii=False, indent=4)

print(f"Annotations saved to {file_path}")


if __name__ == "__main__":
username = private_settings.CVAT_LOGIN
password = private_settings.CVAT_PASSWORD
cvat_job_id = input("Enter CVAT job ID: ")

try:
cookies = get_auth_token(username, password)
job_annotations = download_annotations(cvat_job_id, cookies)
save_annotations(cvat_job_id, job_annotations)
except Exception as e:
print(f"Error: {e}")
109 changes: 109 additions & 0 deletions perception/frames_from_mp4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
from typing import List

import cv2


def extract_frames_by_frequency(
video_paths: List[str], frequency: int, elapsed: float, root_output_dir: str
) -> int:
"""
Extract frames from four-angle video recordings at a specified frequency,
starting from a given time offset (elapsed), and save them as images.

:param video_paths: List of paths to video files.
:param frequency: Frequency of frame extraction in seconds.
:param elapsed: Time offset in seconds from the start of the video.
:param root_output_dir: Root directory where images will be saved.
:return: Index of the last saved frame batch.
"""
if not os.path.exists(root_output_dir):
os.makedirs(root_output_dir)

launch_name = os.path.basename(root_output_dir)

last_images_index = -1

for video_path in video_paths:
angle = os.path.splitext(os.path.basename(video_path))[0].split("-")[-1]
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
print(f"Failed to open video file: {video_path}")
continue

fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
print(f"Could not determine FPS for video: {video_path}")
cap.release()
continue

video_duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps
# Account for the elapsed time offset
elapsed_times = [
elapsed + i * frequency
for i in range(0, int((video_duration - elapsed) // frequency))
]

for i, elapsed_time in enumerate(elapsed_times):
sub_dir_name = f"{launch_name}-{str(i).zfill(3)}_pcd"
output_dir = os.path.join(root_output_dir, "related_images", sub_dir_name)
os.makedirs(output_dir, exist_ok=True)

frame_number = int(elapsed_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По моему опыту таким штукам доверять нельзя, не все читалки гарантируют точный seek по номеру кадра (но для наших целей это не особо важно). Еще как вариант, можно вообще opencv дропнуть и вызвать ffmpeg (ref1, ref2)


ret, frame = cap.read()
if not ret:
print(
f"Failed to extract frame at {elapsed_time:.2f} sec from "
f"{video_path}"
)
continue

output_file_name = f"{launch_name}-{str(i).zfill(3)}-{angle}.jpg"
output_path = os.path.join(output_dir, output_file_name)

cv2.imwrite(output_path, frame)
print(
f"Frame at {elapsed_time:.2f} sec "
f"from {video_path} successfully saved: "
f"{output_path}"
)
if i > last_images_index:
last_images_index = i

cap.release()

return last_images_index


def get_frames_from_mp4_main(
run_name: str, elapsed: float = 0, frequency: int = 2
) -> int:
"""
Main function to extract frames from four-angle video files for a given run.

:param run_name: Name of the run
(used to find video files and determine output paths).
:param elapsed: Time offset in seconds from which to start extracting frames.
:param frequency: Frame extraction interval in seconds.
:return: Index of the last saved frame batch.
"""
video_files = [
f"data/{run_name}/{run_name}-0.mp4",
f"data/{run_name}/{run_name}-90.mp4",
f"data/{run_name}/{run_name}-180.mp4",
f"data/{run_name}/{run_name}-270.mp4",
]
root_output_directory = run_name # Root directory for saving images

last_images_index = extract_frames_by_frequency(
video_files, frequency, elapsed, root_output_directory
)
return last_images_index


if __name__ == "__main__":
test_run_name = "example"
get_frames_from_mp4_main(test_run_name)
Loading