Skip to content

Commit eb5cdf4

Browse files
owenowenismelandscapepainter
authored andcommitted
[Data] Set default strategy for callable class in get_compute_strategy (ray-project#57657)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Instead of raising ValueError when `compute` and `concurrency` is unset for callable class, we use `ActorPoolStrategy(min_size=1, max_size=None)` as default value. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Follow up for ray-project#57035 <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
1 parent 9ab66fc commit eb5cdf4

File tree

4 files changed

+24
-22
lines changed

4 files changed

+24
-22
lines changed

python/ray/data/_internal/util.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -577,17 +577,19 @@ def get_compute_strategy(
577577
compute == "tasks" or isinstance(compute, TaskPoolStrategy)
578578
):
579579
raise ValueError(
580-
"``compute`` must specify an actor compute strategy when using a "
581-
f"callable class, but got: {compute}. For example, use "
582-
"``compute=ray.data.ActorPoolStrategy(size=n)``."
580+
f"You specified the callable class {fn} as your UDF with the compute "
581+
f"{compute}, but Ray Data can't schedule callable classes with the task "
582+
f"pool strategy. To fix this error, pass an ActorPoolStrategy to compute or "
583+
f"None to use the default compute strategy."
583584
)
584585
elif not is_callable_class and (
585586
compute == "actors" or isinstance(compute, ActorPoolStrategy)
586587
):
587588
raise ValueError(
588-
f"``compute`` is specified as the actor compute strategy: {compute}, "
589-
f"but ``fn`` is not a callable class: {fn}. Pass a callable class or "
590-
"use the default ``compute`` strategy."
589+
f"You specified the function {fn} as your UDF with the compute "
590+
f"{compute}, but Ray Data can't schedule regular functions with the actor "
591+
f"pool strategy. To fix this error, pass a TaskPoolStrategy to compute or "
592+
f"None to use the default compute strategy."
591593
)
592594
return compute
593595
elif concurrency is not None:
@@ -639,10 +641,7 @@ def get_compute_strategy(
639641
)
640642
else:
641643
if is_callable_class:
642-
raise ValueError(
643-
"``concurrency`` must be specified when using a callable class. "
644-
"For example, use ``concurrency=n`` for a pool of ``n`` workers."
645-
)
644+
return ActorPoolStrategy(min_size=1, max_size=None)
646645
else:
647646
return TaskPoolStrategy()
648647

python/ray/data/dataset.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,12 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
343343
that can be instantiated to create such a callable.
344344
compute: The compute strategy to use for the map operation.
345345
346-
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
346+
* If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
347347
348348
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
349349
350+
* If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
351+
350352
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
351353
352354
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
@@ -583,10 +585,12 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
583585
to a given map task. Default ``batch_size`` is ``None``.
584586
compute: The compute strategy to use for the map operation.
585587
586-
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
588+
* If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
587589
588590
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
589591
592+
* If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
593+
590594
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
591595
592596
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
@@ -1288,10 +1292,12 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
12881292
that can be instantiated to create such a callable.
12891293
compute: The compute strategy to use for the map operation.
12901294
1291-
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
1295+
* If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
12921296
12931297
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
12941298
1299+
* If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
1300+
12951301
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
12961302
12971303
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
@@ -1426,10 +1432,12 @@ def filter(
14261432
are top-level arguments in the underlying Ray actor construction task.
14271433
compute: The compute strategy to use for the map operation.
14281434
1429-
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
1435+
* If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
14301436
14311437
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
14321438
1439+
* If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
1440+
14331441
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
14341442
14351443
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

python/ray/data/grouped_data.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,12 @@ def map_groups(
163163
making an additional copy.
164164
compute: The compute strategy to use for the map operation.
165165
166-
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
166+
* If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
167167
168168
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
169169
170+
* If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.
171+
170172
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
171173
172174
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

python/ray/data/tests/test_map.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,6 @@ def __call__(self, x):
307307
with pytest.raises(ValueError, match=error_message):
308308
ds.map(UDFClass, concurrency=concurrency).take_all()
309309

310-
# Test concurrency not set.
311-
result = ds.map(udf).take_all()
312-
assert sorted(extract_values("id", result)) == list(range(10)), result
313-
error_message = "``concurrency`` must be specified when using a callable class."
314-
with pytest.raises(ValueError, match=error_message):
315-
ds.map(UDFClass).take_all()
316-
317310

318311
@pytest.mark.parametrize("udf_kind", ["gen", "func"])
319312
def test_flat_map(

0 commit comments

Comments
 (0)