Skip to content

Commit 6320275

Browse files
[Data] Adding MCAP datasource (#55716)
1 parent 41571b1 commit 6320275

File tree

8 files changed

+856
-2
lines changed

8 files changed

+856
-2
lines changed

.vale/styles/config/vocabularies/Data/accept.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ FLAC
1515
[Ii]nqueue(s)?
1616
[Ll]ookup(s)?
1717
LLM(s)?
18+
MCAP
1819
Modin
1920
[Mm]ultiget(s)?
2021
ndarray(s)?

doc/source/data/api/input_output.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ Lance
252252
read_lance
253253
Dataset.write_lance
254254

255+
MCAP (Message Capture)
256+
----------------------
257+
258+
.. autosummary::
259+
:nosignatures:
260+
:toctree: doc/
261+
262+
read_mcap
263+
255264
ClickHouse
256265
----------
257266

python/ray/data/BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,6 +1714,20 @@ py_test(
17141714
],
17151715
)
17161716

1717+
py_test(
1718+
name = "test_mcap",
1719+
size = "medium",
1720+
srcs = ["tests/test_mcap.py"],
1721+
tags = [
1722+
"exclusive",
1723+
"team:data",
1724+
],
1725+
deps = [
1726+
":conftest",
1727+
"//:ray_lib",
1728+
],
1729+
)
1730+
17171731
py_test(
17181732
name = "test_delta_sharing",
17191733
size = "small",

python/ray/data/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
read_images,
6060
read_json,
6161
read_lance,
62+
read_mcap,
6263
read_mongo,
6364
read_numpy,
6465
read_parquet,
@@ -163,6 +164,7 @@
163164
"read_images",
164165
"read_json",
165166
"read_lance",
167+
"read_mcap",
166168
"read_numpy",
167169
"read_mongo",
168170
"read_parquet",
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
"""MCAP (Message Capture) datasource for Ray Data.
2+
3+
MCAP is a standardized format for storing timestamped messages from robotics and
4+
autonomous systems, commonly used for sensor data, control commands, and other
5+
time-series data.
6+
"""
7+
8+
import json
9+
import logging
10+
from dataclasses import dataclass
11+
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Union
12+
13+
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
14+
from ray.data._internal.util import _check_import
15+
from ray.data.block import Block
16+
from ray.data.datasource.file_based_datasource import FileBasedDatasource
17+
from ray.util.annotations import DeveloperAPI
18+
19+
if TYPE_CHECKING:
20+
import pyarrow
21+
from mcap.reader import Channel, Message, Schema
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
@dataclass
27+
class TimeRange:
28+
"""Time range for filtering MCAP messages.
29+
30+
Attributes:
31+
start_time: Start time in nanoseconds (inclusive).
32+
end_time: End time in nanoseconds (exclusive).
33+
"""
34+
35+
start_time: int
36+
end_time: int
37+
38+
def __post_init__(self):
39+
"""Validate time range after initialization."""
40+
if self.start_time >= self.end_time:
41+
raise ValueError(
42+
f"start_time ({self.start_time}) must be less than "
43+
f"end_time ({self.end_time})"
44+
)
45+
if self.start_time < 0 or self.end_time < 0:
46+
raise ValueError(
47+
f"time values must be non-negative, got start_time={self.start_time}, "
48+
f"end_time={self.end_time}"
49+
)
50+
51+
52+
@DeveloperAPI
53+
class MCAPDatasource(FileBasedDatasource):
54+
"""MCAP (Message Capture) datasource for Ray Data.
55+
56+
This datasource provides reading of MCAP files with predicate pushdown
57+
optimization for filtering by topics, time ranges, and message types.
58+
59+
MCAP is a standardized format for storing timestamped messages from robotics and
60+
autonomous systems, commonly used for sensor data, control commands, and other
61+
time-series data.
62+
63+
Examples:
64+
Basic usage:
65+
66+
>>> import ray # doctest: +SKIP
67+
>>> ds = ray.data.read_mcap("/path/to/data.mcap") # doctest: +SKIP
68+
69+
With topic filtering and time range:
70+
71+
>>> from ray.data.datasource import TimeRange # doctest: +SKIP
72+
>>> ds = ray.data.read_mcap( # doctest: +SKIP
73+
... "/path/to/data.mcap",
74+
... topics={"/camera/image_raw", "/lidar/points"},
75+
... time_range=TimeRange(start_time=1000000000, end_time=2000000000)
76+
... ) # doctest: +SKIP
77+
78+
With multiple files and metadata:
79+
80+
>>> ds = ray.data.read_mcap( # doctest: +SKIP
81+
... ["file1.mcap", "file2.mcap"],
82+
... topics={"/camera/image_raw", "/lidar/points"},
83+
... message_types={"sensor_msgs/Image", "sensor_msgs/PointCloud2"},
84+
... include_metadata=True
85+
... ) # doctest: +SKIP
86+
"""
87+
88+
_FILE_EXTENSIONS = ["mcap"]
89+
90+
def __init__(
91+
self,
92+
paths: Union[str, List[str]],
93+
topics: Optional[Union[List[str], Set[str]]] = None,
94+
time_range: Optional[TimeRange] = None,
95+
message_types: Optional[Union[List[str], Set[str]]] = None,
96+
include_metadata: bool = True,
97+
**file_based_datasource_kwargs,
98+
):
99+
"""Initialize MCAP datasource.
100+
101+
Args:
102+
paths: Path or list of paths to MCAP files.
103+
topics: Optional list/set of topic names to include. If specified,
104+
only messages from these topics will be read.
105+
time_range: Optional TimeRange for filtering messages by timestamp.
106+
TimeRange contains start_time and end_time in nanoseconds, where
107+
both values must be non-negative and start_time < end_time.
108+
message_types: Optional list/set of message type names (schema names)
109+
to include. Only messages with matching schema names will be read.
110+
include_metadata: Whether to include MCAP metadata fields in the output.
111+
Defaults to True. When True, includes schema, channel, and message
112+
metadata.
113+
**file_based_datasource_kwargs: Additional arguments for FileBasedDatasource.
114+
"""
115+
super().__init__(paths, **file_based_datasource_kwargs)
116+
117+
_check_import(self, module="mcap", package="mcap")
118+
119+
# Convert to sets for faster lookup
120+
self._topics = set(topics) if topics else None
121+
self._message_types = set(message_types) if message_types else None
122+
self._time_range = time_range
123+
self._include_metadata = include_metadata
124+
125+
def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
126+
"""Read MCAP file and yield blocks of message data.
127+
128+
This method implements efficient MCAP reading with predicate pushdown.
129+
It uses MCAP's built-in filtering capabilities for optimal performance
130+
and applies additional filters when needed.
131+
132+
Args:
133+
f: File-like object to read from. Must be seekable for MCAP reading.
134+
path: Path to the MCAP file being processed.
135+
136+
Yields:
137+
Block: Blocks of MCAP message data as pyarrow Tables.
138+
139+
Raises:
140+
ValueError: If the MCAP file cannot be read or has invalid format.
141+
"""
142+
import mcap
143+
144+
reader = mcap.reader.make_reader(f)
145+
# Note: MCAP summaries are optional and iter_messages works without them
146+
# We don't need to validate the summary since it's not required
147+
148+
# Use MCAP's built-in filtering for topics and time range
149+
messages = reader.iter_messages(
150+
topics=list(self._topics) if self._topics else None,
151+
start_time=self._time_range.start_time if self._time_range else None,
152+
end_time=self._time_range.end_time if self._time_range else None,
153+
log_time_order=True,
154+
reverse=False,
155+
)
156+
157+
builder = DelegatingBlockBuilder()
158+
159+
for schema, channel, message in messages:
160+
# Apply filters that couldn't be pushed down to MCAP level
161+
if not self._should_include_message(schema, channel, message):
162+
continue
163+
164+
# Convert message to dictionary format
165+
message_data = self._message_to_dict(schema, channel, message, path)
166+
builder.add(message_data)
167+
168+
# Yield the block if we have any messages
169+
if builder.num_rows() > 0:
170+
yield builder.build()
171+
172+
def _should_include_message(
173+
self, schema: "Schema", channel: "Channel", message: "Message"
174+
) -> bool:
175+
"""Check if a message should be included based on filters.
176+
177+
This method applies Python-level filtering that cannot be pushed down
178+
to the MCAP library level. Topic filters are already handled by the
179+
MCAP reader, so only message_types filtering is needed here.
180+
181+
Args:
182+
schema: MCAP schema object containing message type information.
183+
channel: MCAP channel object containing topic and metadata.
184+
message: MCAP message object containing the actual data.
185+
186+
Returns:
187+
True if the message should be included, False otherwise.
188+
"""
189+
# Message type filter (cannot be pushed down to MCAP reader)
190+
if self._message_types and schema and schema.name not in self._message_types:
191+
return False
192+
193+
return True
194+
195+
def _message_to_dict(
196+
self, schema: "Schema", channel: "Channel", message: "Message", path: str
197+
) -> Dict[str, Any]:
198+
"""Convert MCAP message to dictionary format.
199+
200+
This method converts MCAP message objects into a standardized dictionary
201+
format suitable for Ray Data processing.
202+
203+
Args:
204+
schema: MCAP schema object containing message type and encoding info.
205+
channel: MCAP channel object containing topic and channel metadata.
206+
message: MCAP message object containing the actual message data.
207+
path: Path to the source file (for include_paths functionality).
208+
209+
Returns:
210+
Dictionary containing message data in Ray Data format.
211+
"""
212+
# Decode message data based on encoding
213+
decoded_data = message.data
214+
if channel.message_encoding == "json" and isinstance(message.data, bytes):
215+
try:
216+
decoded_data = json.loads(message.data.decode("utf-8"))
217+
except (json.JSONDecodeError, UnicodeDecodeError):
218+
# Keep raw bytes if decoding fails
219+
decoded_data = message.data
220+
221+
# Core message data
222+
message_data = {
223+
"data": decoded_data,
224+
"topic": channel.topic,
225+
"log_time": message.log_time,
226+
"publish_time": message.publish_time,
227+
"sequence": message.sequence,
228+
}
229+
230+
# Add metadata if requested
231+
if self._include_metadata:
232+
message_data.update(
233+
{
234+
"channel_id": message.channel_id,
235+
"message_encoding": channel.message_encoding,
236+
"schema_name": schema.name if schema else None,
237+
"schema_encoding": schema.encoding if schema else None,
238+
"schema_data": schema.data if schema else None,
239+
}
240+
)
241+
242+
# Add file path if include_paths is enabled (from FileBasedDatasource)
243+
if getattr(self, "include_paths", False):
244+
message_data["path"] = path
245+
246+
return message_data
247+
248+
def get_name(self) -> str:
249+
"""Return a human-readable name for this datasource."""
250+
return "MCAP"
251+
252+
@property
253+
def supports_distributed_reads(self) -> bool:
254+
"""Whether this datasource supports distributed reads.
255+
256+
MCAP files can be read in parallel across multiple files.
257+
"""
258+
return True

python/ray/data/datasource/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
from ray.data._internal.datasource.delta_sharing_datasource import (
2+
DeltaSharingDatasource,
3+
)
4+
from ray.data._internal.datasource.mcap_datasource import (
5+
MCAPDatasource,
6+
TimeRange,
7+
)
18
from ray.data._internal.datasource.sql_datasource import Connection
29
from ray.data._internal.savemode import SaveMode
310
from ray.data.datasource.datasink import (
@@ -40,18 +47,18 @@
4047
# ray.data.from_huggingface() or HuggingFaceDatasource() directly.
4148
__all__ = [
4249
"BaseFileMetadataProvider",
43-
"BlockBasedFileDatasink",
4450
"Connection",
4551
"Datasink",
4652
"Datasource",
47-
"DeltaSharingDatasource",
4853
"DefaultFileMetadataProvider",
54+
"DeltaSharingDatasource",
4955
"DummyOutputDatasink",
5056
"FastFileMetadataProvider",
5157
"FileBasedDatasource",
5258
"FileShuffleConfig",
5359
"FileMetadataProvider",
5460
"FilenameProvider",
61+
"MCAPDatasource",
5562
"PartitionStyle",
5663
"PathPartitionFilter",
5764
"PathPartitionParser",
@@ -60,7 +67,9 @@
6067
"ReadTask",
6168
"Reader",
6269
"RowBasedFileDatasink",
70+
"BlockBasedFileDatasink",
6371
"_S3FileSystemWrapper",
72+
"TimeRange",
6473
"WriteResult",
6574
"WriteReturnType",
6675
"SaveMode",

0 commit comments

Comments
 (0)