Skip to content
2 changes: 0 additions & 2 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1369,8 +1369,6 @@ python/ray/data/read_api.py
DOC101: Function `read_text`: Docstring contains fewer arguments than in function signature.
DOC103: Function `read_text`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [drop_empty_lines: bool].
DOC103: Function `read_numpy`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**numpy_load_args: ]. Arguments in the docstring but not in the function signature: [numpy_load_args: ].
DOC104: Function `read_binary_files`: Arguments are the same in the docstring and the function signature, but are in a different order.
DOC105: Function `read_binary_files`: Argument names match, but type hints in these args do not match: paths, include_paths, filesystem, parallelism, ray_remote_args, arrow_open_stream_args, meta_provider, partition_filter, partitioning, ignore_missing_paths, shuffle, file_extensions, concurrency, override_num_blocks
--------------------
python/ray/data/tests/test_split.py
DOC106: Function `assert_split_assignment`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
Expand Down
28 changes: 28 additions & 0 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TYPE_CHECKING,
Any,
Callable,
Dict,
Generator,
Iterable,
Iterator,
Expand Down Expand Up @@ -1752,3 +1753,30 @@ def rows_same(actual: pd.DataFrame, expected: pd.DataFrame) -> bool:
expected_items_counts = Counter(frozenset(row.items()) for row in expected_rows)

return actual_items_counts == expected_items_counts


def merge_resources_to_ray_remote_args(
num_cpus: Optional[int],
num_gpus: Optional[int],
memory: Optional[int],
ray_remote_args: Dict[str, Any],
) -> Dict[str, Any]:
"""Convert the given resources to Ray remote args.

Args:
num_cpus: The number of CPUs to be added to the Ray remote args.
num_gpus: The number of GPUs to be added to the Ray remote args.
memory: The memory to be added to the Ray remote args.
ray_remote_args: The Ray remote args to be merged.

Returns:
The converted arguments.
"""
ray_remote_args = ray_remote_args.copy()
if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus
if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus
if memory is not None:
ray_remote_args["memory"] = memory
return ray_remote_args
128 changes: 112 additions & 16 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
ConsumptionAPI,
_validate_rows_per_file_args,
get_compute_strategy,
merge_resources_to_ray_remote_args,
)
from ray.data.aggregate import AggregateFn, Max, Mean, Min, Std, Sum, Unique
from ray.data.block import (
Expand Down Expand Up @@ -403,14 +404,12 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
concurrency=concurrency,
)

if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus

if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

if memory is not None:
ray_remote_args["memory"] = memory
ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)

plan = self._plan.copy()
map_op = MapRows(
Expand Down Expand Up @@ -793,6 +792,9 @@ def with_column(
self,
column_name: str,
expr: Expr,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
**ray_remote_args,
) -> "Dataset":
"""
Expand Down Expand Up @@ -827,6 +829,11 @@ def with_column(
Args:
column_name: The name of the new column.
expr: An expression that defines the new column values.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
**ray_remote_args: Additional resource requirements to request from
Ray for the map tasks (e.g., `num_gpus=1`).

Expand All @@ -840,6 +847,13 @@ def with_column(
# TODO: Once the expression API supports UDFs, we can clean up the code here.
from ray.data.expressions import DownloadExpr

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)

plan = self._plan.copy()
if isinstance(expr, DownloadExpr):
download_op = Download(
Expand Down Expand Up @@ -871,6 +885,9 @@ def add_column(
*,
batch_format: Optional[str] = "pandas",
compute: Optional[str] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[int] = None,
**ray_remote_args,
) -> "Dataset":
Expand Down Expand Up @@ -911,6 +928,11 @@ def add_column(
``pyarrow.Table``. If ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``.
compute: This argument is deprecated. Use ``concurrency`` argument.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The maximum number of Ray workers to use concurrently.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
Expand Down Expand Up @@ -967,6 +989,12 @@ def add_column(batch: DataBatch) -> DataBatch:
if not callable(fn):
raise ValueError("`fn` must be callable, got {}".format(fn))

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
return self.map_batches(
add_column,
batch_format=batch_format,
Expand All @@ -982,6 +1010,9 @@ def drop_columns(
cols: List[str],
*,
compute: Optional[str] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[int] = None,
**ray_remote_args,
) -> "Dataset":
Expand Down Expand Up @@ -1013,6 +1044,11 @@ def drop_columns(
cols: Names of the columns to drop. If any name does not exist,
an exception is raised. Column names must be unique.
compute: This argument is deprecated. Use ``concurrency`` argument.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, do we actually want to expose this here?

Copy link
Member Author

@owenowenisme owenowenisme Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, users can set these attributes here, so I think its reasonable to expose here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, will let @alexeykudinkin decide, but i think for things like drop_columns and add_columns probably it's not that necessary. but let's wait to hear from othesr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @alexeykudinkin offline -- to avoid adding unnecessary complexity, let's avoid exposing these as top-level parameters for non-UDF APIs:

  • drop_columns
  • select_columns
  • rename_columns

You can specify a UDF for add_columns, so I think it's okay to keep there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just removed all the changes in non UDF function, thanks!

concurrency: The maximum number of Ray workers to use concurrently.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
Expand All @@ -1025,6 +1061,12 @@ def drop_columns(
def drop_columns(batch):
return batch.drop(cols)

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
return self.map_batches(
drop_columns,
batch_format="pyarrow",
Expand All @@ -1040,6 +1082,9 @@ def select_columns(
cols: Union[str, List[str]],
*,
compute: Union[str, ComputeStrategy] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[int] = None,
**ray_remote_args,
) -> "Dataset":
Expand Down Expand Up @@ -1076,6 +1121,11 @@ def select_columns(
cols: Names of the columns to select. If a name isn't in the
dataset schema, an exception is raised. Columns also should be unique.
compute: This argument is deprecated. Use ``concurrency`` argument.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The maximum number of Ray workers to use concurrently.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
Expand Down Expand Up @@ -1107,6 +1157,12 @@ def select_columns(

compute = TaskPoolStrategy(size=concurrency)

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
plan = self._plan.copy()
select_op = Project(
self._logical_plan.dag,
Expand All @@ -1123,6 +1179,9 @@ def rename_columns(
self,
names: Union[List[str], Dict[str, str]],
*,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
**ray_remote_args,
):
Expand Down Expand Up @@ -1168,6 +1227,11 @@ def rename_columns(
Args:
names: A dictionary that maps old column names to new column names, or a
list of new column names.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The maximum number of Ray workers to use concurrently.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
Expand Down Expand Up @@ -1233,6 +1297,12 @@ def rename_columns(

compute = TaskPoolStrategy(size=concurrency)

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
plan = self._plan.copy()
select_op = Project(
self._logical_plan.dag,
Expand Down Expand Up @@ -1368,14 +1438,12 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
concurrency=concurrency,
)

if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus

if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

if memory is not None:
ray_remote_args["memory"] = memory
ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)

plan = self._plan.copy()
op = FlatMap(
Expand Down Expand Up @@ -1403,6 +1471,9 @@ def filter(
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
**ray_remote_args,
Expand Down Expand Up @@ -1444,6 +1515,11 @@ def filter(
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
compute: This argument is deprecated. Use ``concurrency`` argument.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
Expand Down Expand Up @@ -1518,6 +1594,12 @@ def filter(
f"{type(fn).__name__} instead."
)

ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
plan = self._plan.copy()
op = Filter(
input_op=self._logical_plan.dag,
Expand Down Expand Up @@ -1667,6 +1749,9 @@ def random_shuffle(
*,
seed: Optional[int] = None,
num_blocks: Optional[int] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
**ray_remote_args,
) -> "Dataset":
"""Randomly shuffle the rows of this :class:`Dataset`.
Expand All @@ -1690,6 +1775,11 @@ def random_shuffle(
Args:
seed: Fix the random seed to use, otherwise one is chosen
based on system randomness.
num_cpus: The number of CPUs to reserve for each parallel shuffle worker.
num_gpus: The number of GPUs to reserve for each parallel shuffle worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel shuffle
worker.
memory: The heap memory in bytes to reserve for each parallel shuffle worker.

Returns:
The shuffled :class:`Dataset`.
Expand All @@ -1702,6 +1792,12 @@ def random_shuffle(
"repartition() instead.", # noqa: E501
)
plan = self._plan.copy()
ray_remote_args = merge_resources_to_ray_remote_args(
num_cpus,
num_gpus,
memory,
ray_remote_args,
)
op = RandomShuffle(
self._logical_plan.dag,
seed=seed,
Expand Down
Loading