Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/operators/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ShuffleAggregation,
_combine,
)
from ray.data._internal.logical.operators.join_operator import JoinType
from ray.data._internal.logical.operators import JoinType
from ray.data._internal.util import GiB, MiB
from ray.data._internal.utils.transform_pyarrow import _is_pa_extension_type
from ray.data.block import Block
Expand Down
51 changes: 45 additions & 6 deletions python/ray/data/_internal/logical/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,77 @@
"""Export logical operator classes used by optimizer rules."""
"""Expose logical operator classes in ray.data._internal.logical.operators."""

from .all_to_all_operator import (
from ray.data._internal.logical.operators.all_to_all_operator import (
AbstractAllToAll,
Aggregate,
RandomizeBlocks,
RandomShuffle,
Repartition,
Sort,
)
from .map_operator import (
from ray.data._internal.logical.operators.count_operator import Count
from ray.data._internal.logical.operators.from_operators import (
AbstractFrom,
FromArrow,
FromBlocks,
FromItems,
FromNumpy,
FromPandas,
)
from ray.data._internal.logical.operators.input_data_operator import InputData
from ray.data._internal.logical.operators.join_operator import Join, JoinSide, JoinType
from ray.data._internal.logical.operators.map_operator import (
AbstractMap,
AbstractUDFMap,
Filter,
FlatMap,
MapBatches,
MapRows,
Project,
StreamingRepartition,
)
from .n_ary_operator import Union
from .one_to_one_operator import AbstractOneToOne, Limit
from .read_operator import Read
from ray.data._internal.logical.operators.n_ary_operator import NAry, Union, Zip
from ray.data._internal.logical.operators.one_to_one_operator import (
AbstractOneToOne,
Download,
Limit,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.streaming_split_operator import StreamingSplit
from ray.data._internal.logical.operators.write_operator import Write

__all__ = [
"AbstractAllToAll",
"AbstractFrom",
"AbstractMap",
"AbstractOneToOne",
"AbstractUDFMap",
"Aggregate",
"Count",
"Download",
"Filter",
"FlatMap",
"FromArrow",
"FromBlocks",
"FromItems",
"FromNumpy",
"FromPandas",
"InputData",
"Join",
"JoinSide",
"JoinType",
"Limit",
"MapBatches",
"MapRows",
"NAry",
"Project",
"RandomShuffle",
"RandomizeBlocks",
"Read",
"Repartition",
"Sort",
"StreamingRepartition",
"StreamingSplit",
"Union",
"Write",
"Zip",
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@

from ray.data.block import Schema

__all__ = [
"AbstractAllToAll",
"Aggregate",
"RandomShuffle",
"RandomizeBlocks",
"Repartition",
"Sort",
]


class AbstractAllToAll(LogicalOperator):
"""Abstract class for logical operators should be converted to physical
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from ray.data._internal.logical.interfaces import LogicalOperator

__all__ = [
"Count",
]


class Count(LogicalOperator):
"""Logical operator that represents counting the number of rows in inputs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

ArrowTable = Union["pa.Table", bytes]

__all__ = [
"AbstractFrom",
"FromArrow",
"FromBlocks",
"FromItems",
"FromNumpy",
"FromPandas",
]


class AbstractFrom(LogicalOperator, SourceOperator, metaclass=abc.ABCMeta):
"""Abstract logical operator for `from_*`."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from ray.data._internal.util import unify_schemas_with_validation
from ray.data.block import BlockMetadata

__all__ = [
"InputData",
]


class InputData(LogicalOperator, SourceOperator):
"""Logical operator for input data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from ray.data.dataset import Schema
from ray.data.expressions import Expr

__all__ = [
"Join",
"JoinSide",
"JoinType",
]


class JoinType(Enum):
INNER = "inner"
Expand Down
12 changes: 12 additions & 0 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@
from ray.data.expressions import Expr, StarExpr
from ray.data.preprocessor import Preprocessor

__all__ = [
"AbstractMap",
"AbstractUDFMap",
"Filter",
"FlatMap",
"MapBatches",
"MapRows",
"Project",
"StreamingRepartition",
]


logger = logging.getLogger(__name__)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
PredicatePassThroughBehavior,
)

__all__ = [
"NAry",
"Union",
"Zip",
]


class NAry(LogicalOperator):
"""Base class for n-ary operators, which take multiple input operators."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

from ray.data.block import Schema

__all__ = [
"AbstractOneToOne",
"Download",
"Limit",
]


class AbstractOneToOne(LogicalOperator):
"""Abstract class for one-to-one logical operators, which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from ray.data.datasource.datasource import Datasource, Reader
from ray.data.expressions import Expr

__all__ = [
"Read",
]


class Read(
AbstractMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import NodeIdStr

__all__ = [
"StreamingSplit",
]


class StreamingSplit(LogicalOperator):
"""Logical operator that represents splitting the input data to `n` splits."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from ray.data.datasource.datasink import Datasink
from ray.data.datasource.datasource import Datasource

__all__ = [
"Write",
]


class Write(AbstractMap):
"""Logical operator for write."""
Expand Down
4 changes: 1 addition & 3 deletions python/ray/data/_internal/logical/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

from ray._common.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.logical.operators import AbstractUDFMap, Read, Write

# The dictionary for the operator name and count.
_recorded_operators = dict()
Expand Down
5 changes: 2 additions & 3 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator
from ray.data._internal.logical.interfaces.logical_plan import LogicalPlan
from ray.data._internal.logical.interfaces.operator import Operator
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators import Read
from ray.data._internal.logical.optimizers import get_plan_conversion_fns
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockMetadataWithSchema, _take_first_non_empty_schema
Expand Down Expand Up @@ -636,8 +636,7 @@ def has_computed_output(self) -> bool:

def require_preserve_order(self) -> bool:
"""Whether this plan requires to preserve order."""
from ray.data._internal.logical.operators.all_to_all_operator import Sort
from ray.data._internal.logical.operators.n_ary_operator import Zip
from ray.data._internal.logical.operators import Sort, Zip

for op in self._logical_plan.dag.post_order_iter():
if isinstance(op, (Zip, Sort)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data._internal.execution.operators.map_transformer import (
BlockMapTransformFn,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators import Read
from ray.data._internal.output_buffer import OutputBlockSizeOption
from ray.data._internal.planner.plan_read_op import plan_read_op
from ray.data.checkpoint.util import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ray.data._internal.execution.operators.map_transformer import (
BlockMapTransformFn,
)
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.logical.operators import Write
from ray.data._internal.planner.plan_write_op import (
_plan_write_op_internal,
generate_collect_write_stats_fn,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/plan_all_to_all_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.logical.operators.all_to_all_operator import (
from ray.data._internal.logical.operators import (
AbstractAllToAll,
Aggregate,
RandomizeBlocks,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/plan_download_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
BlockMapTransformFn,
MapTransformer,
)
from ray.data._internal.logical.operators.one_to_one_operator import Download
from ray.data._internal.logical.operators import Download
from ray.data._internal.output_buffer import OutputBlockSizeOption
from ray.data._internal.util import RetryingPyFileSystem, make_async_gen
from ray.data.block import BlockAccessor
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MapTransformer,
)
from ray.data._internal.execution.util import memory_string
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators import Read
from ray.data._internal.output_buffer import OutputBlockSizeOption
from ray.data._internal.util import _warn_on_high_parallelism
from ray.data.block import Block, BlockMetadata
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
RowMapTransformFn,
)
from ray.data._internal.execution.util import make_callable_class_single_threaded
from ray.data._internal.logical.operators.map_operator import (
from ray.data._internal.logical.operators import (
AbstractUDFMap,
Filter,
FlatMap,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/plan_write_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
BlockMapTransformFn,
MapTransformer,
)
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.logical.operators import Write
from ray.data.block import Block, BlockAccessor
from ray.data.context import DataContext
from ray.data.datasource.datasink import Datasink
Expand Down
24 changes: 12 additions & 12 deletions python/ray/data/_internal/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@
LogicalPlan,
PhysicalPlan,
)
from ray.data._internal.logical.operators.all_to_all_operator import (
from ray.data._internal.logical.operators import (
AbstractAllToAll,
)
from ray.data._internal.logical.operators.count_operator import Count
from ray.data._internal.logical.operators.from_operators import AbstractFrom
from ray.data._internal.logical.operators.input_data_operator import InputData
from ray.data._internal.logical.operators.join_operator import Join
from ray.data._internal.logical.operators.map_operator import (
AbstractFrom,
AbstractUDFMap,
Count,
Download,
Filter,
InputData,
Join,
Limit,
Project,
Read,
StreamingRepartition,
StreamingSplit,
Union,
Write,
Zip,
)
from ray.data._internal.logical.operators.n_ary_operator import Union, Zip
from ray.data._internal.logical.operators.one_to_one_operator import Download, Limit
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.streaming_split_operator import StreamingSplit
from ray.data._internal.logical.operators.write_operator import Write
from ray.data._internal.planner.checkpoint import (
plan_read_op_with_checkpoint_filter,
plan_write_op_with_checkpoint_writer,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/randomize_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ray.data._internal.execution.interfaces.transform_fn import (
AllToAllTransformFnResult,
)
from ray.data._internal.logical.operators.all_to_all_operator import RandomizeBlocks
from ray.data._internal.logical.operators import RandomizeBlocks


def generate_randomize_blocks_fn(
Expand Down
Loading