Skip to content

Commit 0feca03

Browse files
owenowenismezma2
authored andcommitted
[Data] Elevate num_cpus/gpus and memory as top-level params in most APIs (ray-project#56419)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This PR: - Add a util method `merge_resources_to_ray_remote_args` to add reaource args : `num_cpus` `num_gpus` `memory` to `ray_remote_args` and a test for it. - Update `read_api.py` and `dataset.py` to elevate num_cpus/gpus and memory as top-level params <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Closes ray-project#54708 <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: Zhiqiang Ma <zhiqiang.ma@intel.com>
1 parent bc03a93 commit 0feca03

File tree

5 files changed

+328
-22
lines changed

5 files changed

+328
-22
lines changed

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,8 +1369,6 @@ python/ray/data/read_api.py
13691369
DOC101: Function `read_text`: Docstring contains fewer arguments than in function signature.
13701370
DOC103: Function `read_text`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [drop_empty_lines: bool].
13711371
DOC103: Function `read_numpy`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**numpy_load_args: ]. Arguments in the docstring but not in the function signature: [numpy_load_args: ].
1372-
DOC104: Function `read_binary_files`: Arguments are the same in the docstring and the function signature, but are in a different order.
1373-
DOC105: Function `read_binary_files`: Argument names match, but type hints in these args do not match: paths, include_paths, filesystem, parallelism, ray_remote_args, arrow_open_stream_args, meta_provider, partition_filter, partitioning, ignore_missing_paths, shuffle, file_extensions, concurrency, override_num_blocks
13741372
--------------------
13751373
python/ray/data/tests/test_split.py
13761374
DOC106: Function `assert_split_assignment`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature

python/ray/data/_internal/util.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
TYPE_CHECKING,
1717
Any,
1818
Callable,
19+
Dict,
1920
Generator,
2021
Iterable,
2122
Iterator,
@@ -1752,3 +1753,30 @@ def rows_same(actual: pd.DataFrame, expected: pd.DataFrame) -> bool:
17521753
expected_items_counts = Counter(frozenset(row.items()) for row in expected_rows)
17531754

17541755
return actual_items_counts == expected_items_counts
1756+
1757+
1758+
def merge_resources_to_ray_remote_args(
1759+
num_cpus: Optional[int],
1760+
num_gpus: Optional[int],
1761+
memory: Optional[int],
1762+
ray_remote_args: Dict[str, Any],
1763+
) -> Dict[str, Any]:
1764+
"""Convert the given resources to Ray remote args.
1765+
1766+
Args:
1767+
num_cpus: The number of CPUs to be added to the Ray remote args.
1768+
num_gpus: The number of GPUs to be added to the Ray remote args.
1769+
memory: The memory to be added to the Ray remote args.
1770+
ray_remote_args: The Ray remote args to be merged.
1771+
1772+
Returns:
1773+
The converted arguments.
1774+
"""
1775+
ray_remote_args = ray_remote_args.copy()
1776+
if num_cpus is not None:
1777+
ray_remote_args["num_cpus"] = num_cpus
1778+
if num_gpus is not None:
1779+
ray_remote_args["num_gpus"] = num_gpus
1780+
if memory is not None:
1781+
ray_remote_args["memory"] = memory
1782+
return ray_remote_args

python/ray/data/dataset.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
ConsumptionAPI,
9696
_validate_rows_per_file_args,
9797
get_compute_strategy,
98+
merge_resources_to_ray_remote_args,
9899
)
99100
from ray.data.aggregate import AggregateFn, Max, Mean, Min, Std, Sum, Unique
100101
from ray.data.block import (
@@ -403,14 +404,12 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
403404
concurrency=concurrency,
404405
)
405406

406-
if num_cpus is not None:
407-
ray_remote_args["num_cpus"] = num_cpus
408-
409-
if num_gpus is not None:
410-
ray_remote_args["num_gpus"] = num_gpus
411-
412-
if memory is not None:
413-
ray_remote_args["memory"] = memory
407+
ray_remote_args = merge_resources_to_ray_remote_args(
408+
num_cpus,
409+
num_gpus,
410+
memory,
411+
ray_remote_args,
412+
)
414413

415414
plan = self._plan.copy()
416415
map_op = MapRows(
@@ -1365,14 +1364,12 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
13651364
concurrency=concurrency,
13661365
)
13671366

1368-
if num_cpus is not None:
1369-
ray_remote_args["num_cpus"] = num_cpus
1370-
1371-
if num_gpus is not None:
1372-
ray_remote_args["num_gpus"] = num_gpus
1373-
1374-
if memory is not None:
1375-
ray_remote_args["memory"] = memory
1367+
ray_remote_args = merge_resources_to_ray_remote_args(
1368+
num_cpus,
1369+
num_gpus,
1370+
memory,
1371+
ray_remote_args,
1372+
)
13761373

13771374
plan = self._plan.copy()
13781375
op = FlatMap(
@@ -1400,6 +1397,9 @@ def filter(
14001397
fn_kwargs: Optional[Dict[str, Any]] = None,
14011398
fn_constructor_args: Optional[Iterable[Any]] = None,
14021399
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
1400+
num_cpus: Optional[float] = None,
1401+
num_gpus: Optional[float] = None,
1402+
memory: Optional[float] = None,
14031403
concurrency: Optional[Union[int, Tuple[int, int], Tuple[int, int, int]]] = None,
14041404
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
14051405
**ray_remote_args,
@@ -1441,6 +1441,11 @@ def filter(
14411441
This can only be provided if ``fn`` is a callable class. These arguments
14421442
are top-level arguments in the underlying Ray actor construction task.
14431443
compute: This argument is deprecated. Use ``concurrency`` argument.
1444+
num_cpus: The number of CPUs to reserve for each parallel map worker.
1445+
num_gpus: The number of GPUs to reserve for each parallel map worker. For
1446+
example, specify `num_gpus=1` to request 1 GPU for each parallel map
1447+
worker.
1448+
memory: The heap memory in bytes to reserve for each parallel map worker.
14441449
concurrency: The semantics of this argument depend on the type of ``fn``:
14451450
14461451
* If ``fn`` is a function and ``concurrency`` isn't set (default), the
@@ -1515,6 +1520,12 @@ def filter(
15151520
f"{type(fn).__name__} instead."
15161521
)
15171522

1523+
ray_remote_args = merge_resources_to_ray_remote_args(
1524+
num_cpus,
1525+
num_gpus,
1526+
memory,
1527+
ray_remote_args,
1528+
)
15181529
plan = self._plan.copy()
15191530
op = Filter(
15201531
input_op=self._logical_plan.dag,

0 commit comments

Comments
 (0)