Skip to content
16 changes: 14 additions & 2 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,22 @@ def fn(block: Block) -> Block:
# Add/update with expression results
result_block = block
for name, expr in exprs.items():
actual_name = expr.output_name or name
result = eval_expr(expr, result_block)
result_block_accessor = BlockAccessor.for_block(result_block)
result_block = result_block_accessor.upsert_column(name, result)

# Use fill_column for scalar values, upsert_column for arrays
if not isinstance(
result, (pa.Array, pa.ChunkedArray, pd.Series, np.ndarray)
):
# Scalar value - use fill_column to broadcast it
result_block = result_block_accessor.fill_column(
actual_name, result
)
else:
# Array value - use upsert_column
result_block = result_block_accessor.upsert_column(
actual_name, result
)
block = result_block

# 2. (optional) column projection
Expand Down
28 changes: 26 additions & 2 deletions python/ray/data/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Union
from typing import Any, Callable, Dict, List, Optional, Union

from ray.data.block import BatchColumn
from ray.data.datatype import DataType
Expand Down Expand Up @@ -85,6 +85,8 @@ class Expr(ABC):
"""

data_type: DataType
# output_name is used to rename the expression result when `alias()` is called. Not using `name` to avoid collision with ColumnExpr.name
output_name: Optional[str] = field(default=None, init=False)

@abstractmethod
def structurally_equals(self, other: Any) -> bool:
Expand Down Expand Up @@ -208,6 +210,28 @@ def not_in(self, values: Union[List[Any], "Expr"]) -> "Expr":
values = LiteralExpr(values)
return self._bin(values, Operation.NOT_IN)

def alias(self, name: str) -> "Expr":
"""Rename the expression.

This method allows you to assign a new name to an expression result.
This is particularly useful when you want to specify the output column name
directly within the expression rather than as a separate parameter.

Args:
name: The new name for the expression

Returns:
An Expr that wraps this expression with the specified name

Example:
>>> from ray.data.expressions import col, lit
>>> # Create an expression with a new aliased name
>>> expr = (col("price") * col("quantity")).alias("total")
>>> # Can be used with Dataset operations that support named expressions
"""
object.__setattr__(self, "output_name", name)
return self


@DeveloperAPI(stability="alpha")
@dataclass(frozen=True, eq=False)
Expand Down Expand Up @@ -603,8 +627,8 @@ def download(uri_column_name: str) -> DownloadExpr:
"BinaryExpr",
"UnaryExpr",
"UDFExpr",
"udf",
"DownloadExpr",
"udf",
"col",
"lit",
"download",
Expand Down
57 changes: 57 additions & 0 deletions python/ray/data/tests/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,66 @@
# Commutative operations are not structurally equal
(col("a") + col("b"), col("b") + col("a"), False),
(lit(1) * col("c"), col("c") * lit(1), False),
# Alias expression tests
(col("a").alias("b"), col("a").alias("b"), True),
(col("a").alias("b"), col("a").alias("c"), False), # Different alias
(col("a").alias("b"), col("b").alias("b"), False), # Different column
((col("a") + 1).alias("result"), (col("a") + 1).alias("result"), True),
(
(col("a") + 1).alias("result"),
(col("a") + 2).alias("result"),
False,
), # Different expr
(col("a").alias("b"), col("a"), False), # Alias vs non-alias
]


@pytest.mark.parametrize(
"expr, alias_name, expected_alias",
[
# (expression, alias_name, expected_alias)
(col("price"), "product_price", "product_price"),
(lit(42), "answer", "answer"),
(col("a") + col("b"), "sum", "sum"),
((col("price") * col("qty")) + lit(5), "total_with_fee", "total_with_fee"),
(col("age") >= lit(18), "is_adult", "is_adult"),
],
ids=["col_alias", "lit_alias", "binary_alias", "complex_alias", "comparison_alias"],
)
def test_alias_functionality(expr, alias_name, expected_alias):
"""Test alias functionality with various expression types."""
import pandas as pd

from ray.data._expression_evaluator import eval_expr
from ray.data.expressions import AliasExpr

# Test alias creation
aliased_expr = expr.alias(alias_name)
assert isinstance(aliased_expr, AliasExpr)
assert aliased_expr.alias == expected_alias
assert aliased_expr.expr.structurally_equals(expr)

# Test data type preservation
assert aliased_expr.data_type == expr.data_type

# Test evaluation equivalence
test_data = pd.DataFrame(
{
"price": [10, 20],
"qty": [2, 3],
"a": [1, 2],
"b": [3, 4],
"age": [17, 25],
}
)
original_result = eval_expr(expr, test_data)
aliased_result = eval_expr(aliased_expr, test_data)
if hasattr(original_result, "equals"): # For pandas Series
assert original_result.equals(aliased_result)
else: # For scalars
assert original_result == aliased_result


@pytest.mark.parametrize(
"expr1, expr2, expected",
STRUCTURAL_EQUALITY_TEST_CASES,
Expand Down
85 changes: 85 additions & 0 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -3276,6 +3276,91 @@ def test_with_column_filter_in_pipeline(ray_start_regular_shared):
pd.testing.assert_frame_equal(result_df, expected_df, check_dtype=False)


@pytest.mark.parametrize(
"expr_factory, expected_columns, alias_name, expected_values",
[
(
lambda: col("id").alias("new_id"),
["id", "new_id"],
"new_id",
[0, 1, 2, 3, 4], # Copy of id column
),
(
lambda: (col("id") + 1).alias("id_plus_one"),
["id", "id_plus_one"],
"id_plus_one",
[1, 2, 3, 4, 5], # id + 1
),
(
lambda: (col("id") * 2 + 5).alias("transformed"),
["id", "transformed"],
"transformed",
[5, 7, 9, 11, 13], # id * 2 + 5
),
(
lambda: lit(42).alias("constant"),
["id", "constant"],
"constant",
[42, 42, 42, 42, 42], # lit(42)
),
(
lambda: (col("id") >= 0).alias("is_non_negative"),
["id", "is_non_negative"],
"is_non_negative",
[True, True, True, True, True], # id >= 0
),
(
lambda: (col("id") + 1).alias("id"),
["id"], # Only one column since we're overwriting id
"id",
[1, 2, 3, 4, 5], # id + 1 replaces original id
),
],
ids=[
"col_alias",
"arithmetic_alias",
"complex_alias",
"literal_alias",
"comparison_alias",
"overwrite_existing_column",
],
)
def test_with_column_alias_expressions(
ray_start_regular_shared,
expr_factory,
expected_columns,
alias_name,
expected_values,
):
"""Test that alias expressions work correctly with with_column."""
expr = expr_factory()

# Verify the alias name matches what we expect
assert expr.output_name == alias_name

# Apply the aliased expression
ds = ray.data.range(5).with_column(alias_name, expr)

# Convert to pandas for comprehensive comparison
result_df = ds.to_pandas()

# Create expected DataFrame
expected_df = pd.DataFrame({"id": [0, 1, 2, 3, 4], alias_name: expected_values})

# Ensure column order matches expected_columns
expected_df = expected_df[expected_columns]

# Assert the entire DataFrame is equal
pd.testing.assert_frame_equal(result_df, expected_df)
# Verify the alias expression evaluates the same as the non-aliased version
non_aliased_expr = expr
ds_non_aliased = ray.data.range(5).with_column(alias_name, non_aliased_expr)

non_aliased_df = ds_non_aliased.to_pandas()

pd.testing.assert_frame_equal(result_df, non_aliased_df)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test Fails Due to Self-Comparison

The test_with_column_alias_expressions intends to compare aliased and non-aliased expressions. However, non_aliased_expr = expr assigns the already aliased expression, leading to a self-comparison. This makes the assertion redundant and the test's stated goal unfulfilled.

Fix in Cursor Fix in Web



if __name__ == "__main__":
import sys

Expand Down