Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ Dataset API

.. include:: ray.data.Dataset.rst

Compute Strategy API
--------------------
.. currentmodule:: ray.data
.. autosummary::
:nosignatures:
:toctree: doc/

ActorPoolStrategy
TaskPoolStrategy

Schema
------
Expand Down
15 changes: 10 additions & 5 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,11 @@ To transform data with a Python class, complete these steps:
1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.

2. Call :meth:`~ray.data.Dataset.map_batches`, :meth:`~ray.data.Dataset.map`, or
:meth:`~ray.data.Dataset.flat_map`. Pass the number of concurrent workers to use with the ``concurrency`` argument. Each worker transforms a partition of data in parallel.
Fixing the number of concurrent workers gives the most predictable performance, but you can also pass a tuple of ``(min, max)`` to allow Ray Data to automatically
scale the number of concurrent workers.
:meth:`~ray.data.Dataset.flat_map`. Pass a compute strategy with the ``compute``
argument to control how many workers Ray uses. Each worker transforms a partition
of data in parallel. Use ``ray.data.TaskPoolStrategy(size=n)`` to cap the number of
concurrent tasks, or ``ray.data.ActorPoolStrategy(...)`` to run callable classes on
a fixed or autoscaling actor pool.

.. tab-set::

Expand All @@ -288,7 +290,10 @@ To transform data with a Python class, complete these steps:

ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(TorchPredictor, concurrency=2)
.map_batches(
TorchPredictor,
compute=ray.data.ActorPoolStrategy(size=2),
)
)

.. testcode::
Expand Down Expand Up @@ -322,7 +327,7 @@ To transform data with a Python class, complete these steps:
.map_batches(
TorchPredictor,
# Two workers with one GPU each
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ray._private.arrow_utils import get_pyarrow_version

from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy
from ray.data._internal.datasource.tfrecords_datasource import TFXReadOptions
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
Expand Down Expand Up @@ -132,6 +132,7 @@
"RowBasedFileDatasink",
"Schema",
"SinkMode",
"TaskPoolStrategy",
"from_daft",
"from_dask",
"from_items",
Expand Down
23 changes: 18 additions & 5 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import Block, UserDefinedFunction
from ray.util.annotations import DeveloperAPI
from ray.util.annotations import DeveloperAPI, PublicAPI

logger = logging.getLogger(__name__)

Expand All @@ -28,8 +28,16 @@ class ComputeStrategy:
pass


@DeveloperAPI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users are required to use this API, this should be a PublicAPI.

Same with ActorPoolStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we add both of these to the API reference?

Copy link
Member Author

@owenowenisme owenowenisme Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image Added in dataset api page Screenshot 2025-10-11 at 3 21 14 PM image

@PublicAPI
class TaskPoolStrategy(ComputeStrategy):
"""Specify the task-based compute strategy for a Dataset transform.

TaskPoolStrategy executes dataset transformations using Ray tasks that are
scheduled through a pool. Provide ``size`` to cap the number of concurrent
tasks; leave it unset to allow Ray Data to scale the task count
automatically.
"""

def __init__(
self,
size: Optional[int] = None,
Expand All @@ -53,17 +61,22 @@ def __repr__(self) -> str:
return f"TaskPoolStrategy(size={self.size})"


@PublicAPI
class ActorPoolStrategy(ComputeStrategy):
"""Specify the compute strategy for a Dataset transform.
"""Specify the actor-based compute strategy for a Dataset transform.

ActorPoolStrategy specifies that an autoscaling pool of actors should be used
for a given Dataset transform. This is useful for stateful setup of callable
classes.

For a fixed-sized pool of size ``n``, specify ``compute=ActorPoolStrategy(size=n)``.
To autoscale from ``m`` to ``n`` actors, specify
For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``.

To autoscale from ``m`` to ``n`` actors, use
``ActorPoolStrategy(min_size=m, max_size=n)``.

To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use
``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``.

To increase opportunities for pipelining task dependency prefetching with
computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker
Expand Down
14 changes: 7 additions & 7 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,6 @@ def get_compute_strategy(
)

if compute is not None:
# Legacy code path to support `compute` argument.
logger.warning(
"The argument ``compute`` is deprecated in Ray 2.9. Please specify "
"argument ``concurrency`` instead. For more information, see "
"https://docs.ray.io/en/master/data/transforming-data.html#"
"stateful-transforms."
)
if is_callable_class and (
compute == "tasks" or isinstance(compute, TaskPoolStrategy)
):
Expand All @@ -599,6 +592,13 @@ def get_compute_strategy(
)
return compute
elif concurrency is not None:
# Legacy code path to support `concurrency` argument.
logger.warning(
"The argument ``concurrency`` is deprecated in Ray 2.51. Please specify "
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this will be included in 2.50, so set to 2.51 for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this will be 2.51

"argument ``compute`` instead. For more information, see "
"https://docs.ray.io/en/master/data/transforming-data.html#"
"stateful-transforms."
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Outdated Error Message for Callable Classes

The get_compute_strategy function's error message for callable classes is outdated, incorrectly advising users to specify the deprecated concurrency parameter (e.g., concurrency=n) instead of the preferred compute parameter with an ActorPoolStrategy.

Fix in Cursor Fix in Web

if isinstance(concurrency, tuple):
# Validate tuple length and that all elements are integers
if len(concurrency) not in (2, 3) or not all(
Expand Down
138 changes: 51 additions & 87 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,18 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
Args:
fn: The function to apply to each row, or a class type
that can be instantiated to create such a callable.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.

* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameter name is initial_size (not initial)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial) initial is a int
which means that initial_size will be set to initial(a number), do you think this is misleading?

fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
Expand All @@ -357,27 +368,7 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time prior
Expand Down Expand Up @@ -590,7 +581,18 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
The actual size of the batch provided to ``fn`` may be smaller than
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent
to a given map task. Default ``batch_size`` is ``None``.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.

* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

batch_format: If ``"default"`` or ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
``pandas.DataFrame``. If ``"pyarrow"``, batches are
Expand Down Expand Up @@ -620,27 +622,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time prior
Expand Down Expand Up @@ -1304,7 +1286,18 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
Args:
fn: The function or generator to apply to each record, or a class type
that can be instantiated to create such a callable.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.

* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
Expand All @@ -1320,27 +1313,7 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time
Expand Down Expand Up @@ -1451,33 +1424,24 @@ def filter(
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
compute: This argument is deprecated. Use ``concurrency`` argument.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:
compute: The compute strategy to use for the map operation.

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time
Expand Down