Skip to content

Commit 9ceddf2

Browse files
owenowenismeelliot-barn
authored andcommitted
[Data] Streamline concurrency parameter semantic (#57035)
To remove overloaded semantics, we will undeprecate the compute parameter, deprecate the concurrency parameters, and ask users to use `ActorPoolStrategy` and `TaskPoolStrategy` directly, which makes the logic straightforward. Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent 27b11f5 commit 9ceddf2

File tree

6 files changed

+97
-105
lines changed

6 files changed

+97
-105
lines changed

doc/source/data/api/dataset.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ Dataset API
55

66
.. include:: ray.data.Dataset.rst
77

8+
Compute Strategy API
9+
--------------------
10+
.. currentmodule:: ray.data
11+
.. autosummary::
12+
:nosignatures:
13+
:toctree: doc/
14+
15+
ActorPoolStrategy
16+
TaskPoolStrategy
817

918
Schema
1019
------

doc/source/data/transforming-data.rst

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,11 @@ To transform data with a Python class, complete these steps:
259259
1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.
260260

261261
2. Call :meth:`~ray.data.Dataset.map_batches`, :meth:`~ray.data.Dataset.map`, or
262-
:meth:`~ray.data.Dataset.flat_map`. Pass the number of concurrent workers to use with the ``concurrency`` argument. Each worker transforms a partition of data in parallel.
263-
Fixing the number of concurrent workers gives the most predictable performance, but you can also pass a tuple of ``(min, max)`` to allow Ray Data to automatically
264-
scale the number of concurrent workers.
262+
:meth:`~ray.data.Dataset.flat_map`. Pass a compute strategy with the ``compute``
263+
argument to control how many workers Ray uses. Each worker transforms a partition
264+
of data in parallel. Use ``ray.data.TaskPoolStrategy(size=n)`` to cap the number of
265+
concurrent tasks, or ``ray.data.ActorPoolStrategy(...)`` to run callable classes on
266+
a fixed or autoscaling actor pool.
265267

266268
.. tab-set::
267269

@@ -288,7 +290,10 @@ To transform data with a Python class, complete these steps:
288290

289291
ds = (
290292
ray.data.from_numpy(np.ones((32, 100)))
291-
.map_batches(TorchPredictor, concurrency=2)
293+
.map_batches(
294+
TorchPredictor,
295+
compute=ray.data.ActorPoolStrategy(size=2),
296+
)
292297
)
293298

294299
.. testcode::
@@ -322,7 +327,7 @@ To transform data with a Python class, complete these steps:
322327
.map_batches(
323328
TorchPredictor,
324329
# Two workers with one GPU each
325-
concurrency=2,
330+
compute=ray.data.ActorPoolStrategy(size=2),
326331
# Batch size is required if you're using GPUs.
327332
batch_size=4,
328333
num_gpus=1

python/ray/data/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from ray._private.arrow_utils import get_pyarrow_version
77

8-
from ray.data._internal.compute import ActorPoolStrategy
8+
from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy
99
from ray.data._internal.datasource.tfrecords_datasource import TFXReadOptions
1010
from ray.data._internal.execution.interfaces import (
1111
ExecutionOptions,
@@ -133,6 +133,7 @@
133133
"RowBasedFileDatasink",
134134
"Schema",
135135
"SinkMode",
136+
"TaskPoolStrategy",
136137
"from_daft",
137138
"from_dask",
138139
"from_items",

python/ray/data/_internal/compute.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from ray.data._internal.execution.interfaces import TaskContext
55
from ray.data.block import Block, UserDefinedFunction
6-
from ray.util.annotations import DeveloperAPI
6+
from ray.util.annotations import DeveloperAPI, PublicAPI
77

88
logger = logging.getLogger(__name__)
99

@@ -28,8 +28,16 @@ class ComputeStrategy:
2828
pass
2929

3030

31-
@DeveloperAPI
31+
@PublicAPI
3232
class TaskPoolStrategy(ComputeStrategy):
33+
"""Specify the task-based compute strategy for a Dataset transform.
34+
35+
TaskPoolStrategy executes dataset transformations using Ray tasks that are
36+
scheduled through a pool. Provide ``size`` to cap the number of concurrent
37+
tasks; leave it unset to allow Ray Data to scale the task count
38+
automatically.
39+
"""
40+
3341
def __init__(
3442
self,
3543
size: Optional[int] = None,
@@ -53,17 +61,22 @@ def __repr__(self) -> str:
5361
return f"TaskPoolStrategy(size={self.size})"
5462

5563

64+
@PublicAPI
5665
class ActorPoolStrategy(ComputeStrategy):
57-
"""Specify the compute strategy for a Dataset transform.
66+
"""Specify the actor-based compute strategy for a Dataset transform.
5867
5968
ActorPoolStrategy specifies that an autoscaling pool of actors should be used
6069
for a given Dataset transform. This is useful for stateful setup of callable
6170
classes.
6271
63-
For a fixed-sized pool of size ``n``, specify ``compute=ActorPoolStrategy(size=n)``.
64-
To autoscale from ``m`` to ``n`` actors, specify
72+
For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``.
73+
74+
To autoscale from ``m`` to ``n`` actors, use
6575
``ActorPoolStrategy(min_size=m, max_size=n)``.
6676
77+
To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use
78+
``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``.
79+
6780
To increase opportunities for pipelining task dependency prefetching with
6881
computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
6982
to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker

python/ray/data/_internal/util.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -573,13 +573,6 @@ def get_compute_strategy(
573573
)
574574

575575
if compute is not None:
576-
# Legacy code path to support `compute` argument.
577-
logger.warning(
578-
"The argument ``compute`` is deprecated in Ray 2.9. Please specify "
579-
"argument ``concurrency`` instead. For more information, see "
580-
"https://docs.ray.io/en/master/data/transforming-data.html#"
581-
"stateful-transforms."
582-
)
583576
if is_callable_class and (
584577
compute == "tasks" or isinstance(compute, TaskPoolStrategy)
585578
):
@@ -598,6 +591,13 @@ def get_compute_strategy(
598591
)
599592
return compute
600593
elif concurrency is not None:
594+
# Legacy code path to support `concurrency` argument.
595+
logger.warning(
596+
"The argument ``concurrency`` is deprecated in Ray 2.51. Please specify "
597+
"argument ``compute`` instead. For more information, see "
598+
"https://docs.ray.io/en/master/data/transforming-data.html#"
599+
"stateful-transforms."
600+
)
601601
if isinstance(concurrency, tuple):
602602
# Validate tuple length and that all elements are integers
603603
if len(concurrency) not in (2, 3) or not all(

python/ray/data/dataset.py

Lines changed: 51 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,18 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
341341
Args:
342342
fn: The function to apply to each row, or a class type
343343
that can be instantiated to create such a callable.
344-
compute: This argument is deprecated. Use ``concurrency`` argument.
344+
compute: The compute strategy to use for the map operation.
345+
346+
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
347+
348+
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
349+
350+
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
351+
352+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
353+
354+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
355+
345356
fn_args: Positional arguments to pass to ``fn`` after the first argument.
346357
These arguments are top-level arguments to the underlying Ray task.
347358
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
@@ -357,27 +368,7 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
357368
example, specify `num_gpus=1` to request 1 GPU for each parallel map
358369
worker.
359370
memory: The heap memory in bytes to reserve for each parallel map worker.
360-
concurrency: The semantics of this argument depend on the type of ``fn``:
361-
362-
* If ``fn`` is a function and ``concurrency`` isn't set (default), the
363-
actual concurrency is implicitly determined by the available
364-
resources and number of input blocks.
365-
366-
* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
367-
launches *at most* ``n`` concurrent tasks.
368-
369-
* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
370-
uses an actor pool with *exactly* ``n`` workers.
371-
372-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
373-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.
374-
375-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
376-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
377-
378-
* If ``fn`` is a class and ``concurrency`` isn't set (default), this
379-
method raises an error.
380-
371+
concurrency: This argument is deprecated. Use ``compute`` argument.
381372
ray_remote_args_fn: A function that returns a dictionary of remote args
382373
passed to each map worker. The purpose of this argument is to generate
383374
dynamic arguments for each actor/task, and will be called each time prior
@@ -590,7 +581,18 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
590581
The actual size of the batch provided to ``fn`` may be smaller than
591582
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent
592583
to a given map task. Default ``batch_size`` is ``None``.
593-
compute: This argument is deprecated. Use ``concurrency`` argument.
584+
compute: The compute strategy to use for the map operation.
585+
586+
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
587+
588+
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
589+
590+
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
591+
592+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
593+
594+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
595+
594596
batch_format: If ``"default"`` or ``"numpy"``, batches are
595597
``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
596598
``pandas.DataFrame``. If ``"pyarrow"``, batches are
@@ -620,27 +622,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
620622
example, specify `num_gpus=1` to request 1 GPU for each parallel map
621623
worker.
622624
memory: The heap memory in bytes to reserve for each parallel map worker.
623-
concurrency: The semantics of this argument depend on the type of ``fn``:
624-
625-
* If ``fn`` is a function and ``concurrency`` isn't set (default), the
626-
actual concurrency is implicitly determined by the available
627-
resources and number of input blocks.
628-
629-
* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
630-
launches *at most* ``n`` concurrent tasks.
631-
632-
* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
633-
uses an actor pool with *exactly* ``n`` workers.
634-
635-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
636-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.
637-
638-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
639-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
640-
641-
* If ``fn`` is a class and ``concurrency`` isn't set (default), this
642-
method raises an error.
643-
625+
concurrency: This argument is deprecated. Use ``compute`` argument.
644626
ray_remote_args_fn: A function that returns a dictionary of remote args
645627
passed to each map worker. The purpose of this argument is to generate
646628
dynamic arguments for each actor/task, and will be called each time prior
@@ -1304,7 +1286,18 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
13041286
Args:
13051287
fn: The function or generator to apply to each record, or a class type
13061288
that can be instantiated to create such a callable.
1307-
compute: This argument is deprecated. Use ``concurrency`` argument.
1289+
compute: The compute strategy to use for the map operation.
1290+
1291+
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
1292+
1293+
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
1294+
1295+
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
1296+
1297+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
1298+
1299+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
1300+
13081301
fn_args: Positional arguments to pass to ``fn`` after the first argument.
13091302
These arguments are top-level arguments to the underlying Ray task.
13101303
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
@@ -1320,27 +1313,7 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
13201313
example, specify `num_gpus=1` to request 1 GPU for each parallel map
13211314
worker.
13221315
memory: The heap memory in bytes to reserve for each parallel map worker.
1323-
concurrency: The semantics of this argument depend on the type of ``fn``:
1324-
1325-
* If ``fn`` is a function and ``concurrency`` isn't set (default), the
1326-
actual concurrency is implicitly determined by the available
1327-
resources and number of input blocks.
1328-
1329-
* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
1330-
launches *at most* ``n`` concurrent tasks.
1331-
1332-
* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
1333-
uses an actor pool with *exactly* ``n`` workers.
1334-
1335-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
1336-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.
1337-
1338-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
1339-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
1340-
1341-
* If ``fn`` is a class and ``concurrency`` isn't set (default), this
1342-
method raises an error.
1343-
1316+
concurrency: This argument is deprecated. Use ``compute`` argument.
13441317
ray_remote_args_fn: A function that returns a dictionary of remote args
13451318
passed to each map worker. The purpose of this argument is to generate
13461319
dynamic arguments for each actor/task, and will be called each time
@@ -1451,33 +1424,24 @@ def filter(
14511424
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
14521425
This can only be provided if ``fn`` is a callable class. These arguments
14531426
are top-level arguments in the underlying Ray actor construction task.
1454-
compute: This argument is deprecated. Use ``concurrency`` argument.
1455-
num_cpus: The number of CPUs to reserve for each parallel map worker.
1456-
num_gpus: The number of GPUs to reserve for each parallel map worker. For
1457-
example, specify `num_gpus=1` to request 1 GPU for each parallel map
1458-
worker.
1459-
memory: The heap memory in bytes to reserve for each parallel map worker.
1460-
concurrency: The semantics of this argument depend on the type of ``fn``:
1427+
compute: The compute strategy to use for the map operation.
14611428
1462-
* If ``fn`` is a function and ``concurrency`` isn't set (default), the
1463-
actual concurrency is implicitly determined by the available
1464-
resources and number of input blocks.
1429+
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
14651430
1466-
* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
1467-
launches *at most* ``n`` concurrent tasks.
1431+
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.
14681432
1469-
* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
1470-
uses an actor pool with *exactly* ``n`` workers.
1433+
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
14711434
1472-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
1473-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.
1435+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
14741436
1475-
* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
1476-
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
1477-
1478-
* If ``fn`` is a class and ``concurrency`` isn't set (default), this
1479-
method raises an error.
1437+
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
14801438
1439+
num_cpus: The number of CPUs to reserve for each parallel map worker.
1440+
num_gpus: The number of GPUs to reserve for each parallel map worker. For
1441+
example, specify `num_gpus=1` to request 1 GPU for each parallel map
1442+
worker.
1443+
memory: The heap memory in bytes to reserve for each parallel map worker.
1444+
concurrency: This argument is deprecated. Use ``compute`` argument.
14811445
ray_remote_args_fn: A function that returns a dictionary of remote args
14821446
passed to each map worker. The purpose of this argument is to generate
14831447
dynamic arguments for each actor/task, and will be called each time

0 commit comments

Comments
 (0)