Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,11 @@ To transform data with a Python class, complete these steps:
1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.

2. Call :meth:`~ray.data.Dataset.map_batches`, :meth:`~ray.data.Dataset.map`, or
:meth:`~ray.data.Dataset.flat_map`. Pass the number of concurrent workers to use with the ``concurrency`` argument. Each worker transforms a partition of data in parallel.
Fixing the number of concurrent workers gives the most predictable performance, but you can also pass a tuple of ``(min, max)`` to allow Ray Data to automatically
scale the number of concurrent workers.
:meth:`~ray.data.Dataset.flat_map`. Pass a compute strategy with the ``compute``
argument to control how many workers Ray uses. Each worker transforms a partition
of data in parallel. Use ``ray.data.TaskPoolStrategy(size=n)`` to cap the number of
concurrent tasks, or ``ray.data.ActorPoolStrategy(...)`` to run callable classes on
a fixed or autoscaling actor pool.

.. tab-set::

Expand All @@ -288,7 +290,10 @@ To transform data with a Python class, complete these steps:

ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(TorchPredictor, concurrency=2)
.map_batches(
TorchPredictor,
compute=ray.data.ActorPoolStrategy(size=2),
)
)

.. testcode::
Expand Down Expand Up @@ -322,7 +327,7 @@ To transform data with a Python class, complete these steps:
.map_batches(
TorchPredictor,
# Two workers with one GPU each
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ray._private.arrow_utils import get_pyarrow_version

from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy
from ray.data._internal.datasource.tfrecords_datasource import TFXReadOptions
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
Expand Down Expand Up @@ -132,6 +132,7 @@
"RowBasedFileDatasink",
"Schema",
"SinkMode",
"TaskPoolStrategy",
"from_daft",
"from_dask",
"from_items",
Expand Down
1 change: 0 additions & 1 deletion python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ComputeStrategy:
pass


@DeveloperAPI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users are required to use this API, this should be a PublicAPI.

Same with ActorPoolStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we add both of these to the API reference?

Copy link
Member Author

@owenowenisme owenowenisme Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image Added in dataset api page Screenshot 2025-10-11 at 3 21 14 PM image

class TaskPoolStrategy(ComputeStrategy):
def __init__(
self,
Expand Down
14 changes: 7 additions & 7 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,6 @@ def get_compute_strategy(
)

if compute is not None:
# Legacy code path to support `compute` argument.
logger.warning(
"The argument ``compute`` is deprecated in Ray 2.9. Please specify "
"argument ``concurrency`` instead. For more information, see "
"https://docs.ray.io/en/master/data/transforming-data.html#"
"stateful-transforms."
)
if is_callable_class and (
compute == "tasks" or isinstance(compute, TaskPoolStrategy)
):
Expand All @@ -599,6 +592,13 @@ def get_compute_strategy(
)
return compute
elif concurrency is not None:
# Legacy code path to support `concurrency` argument.
logger.warning(
"The argument ``concurrency`` is deprecated in Ray 2.51. Please specify "
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this will be included in 2.50, so set to 2.51 for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this will be 2.51

"argument ``compute`` instead. For more information, see "
"https://docs.ray.io/en/master/data/transforming-data.html#"
"stateful-transforms."
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Outdated Error Message for Callable Classes

The get_compute_strategy function's error message for callable classes is outdated, incorrectly advising users to specify the deprecated concurrency parameter (e.g., concurrency=n) instead of the preferred compute parameter with an ActorPoolStrategy.

Fix in Cursor Fix in Web

if isinstance(concurrency, tuple):
# Validate tuple length and that all elements are integers
if len(concurrency) not in (2, 3) or not all(
Expand Down
116 changes: 28 additions & 88 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
Args:
fn: The function to apply to each row, or a class type
that can be instantiated to create such a callable.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch a ``n`` concurrent Ray tasks.
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
Comment on lines 344 to 354
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't rendering correctly. I think you might need to add the blank lines
image

fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
Expand All @@ -357,27 +362,7 @@ def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time prior
Expand Down Expand Up @@ -590,7 +575,12 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
The actual size of the batch provided to ``fn`` may be smaller than
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent
to a given map task. Default ``batch_size`` is ``None``.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch a ``n`` concurrent Ray tasks.
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
batch_format: If ``"default"`` or ``"numpy"``, batches are
``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
``pandas.DataFrame``. If ``"pyarrow"``, batches are
Expand Down Expand Up @@ -620,27 +610,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time prior
Expand Down Expand Up @@ -1304,7 +1274,12 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
Args:
fn: The function or generator to apply to each record, or a class type
that can be instantiated to create such a callable.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch a ``n`` concurrent Ray tasks.
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
Expand All @@ -1320,27 +1295,7 @@ def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time
Expand Down Expand Up @@ -1451,33 +1406,18 @@ def filter(
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.
* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.
* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch a ``n`` concurrent Ray tasks.
* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.
* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
memory: The heap memory in bytes to reserve for each parallel map worker.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time
Expand Down