Skip to content
5 changes: 5 additions & 0 deletions python/ray/data/_expression_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ray.data.block import DataBatch
from ray.data.expressions import (
AliasExpr,
BinaryExpr,
ColumnExpr,
Expr,
Expand Down Expand Up @@ -165,6 +166,10 @@ def _eval_expr_recursive(

return result

if isinstance(expr, AliasExpr):
# The renaming of the column is handled in the project op planner stage.
return _eval_expr_recursive(expr.expr, batch, ops)

raise TypeError(f"Unsupported expression node: {type(expr).__name__}")


Expand Down
19 changes: 11 additions & 8 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,21 @@ def column_names(self) -> List[str]:
return self._table.column_names

def fill_column(self, name: str, value: Any) -> Block:
assert name not in self._table.column_names

import pyarrow.compute as pc

if isinstance(value, pyarrow.Scalar):
type = value.type
# Check if value is array-like - if so, use upsert_column logic
if isinstance(value, (pyarrow.Array, pyarrow.ChunkedArray)):
return self.upsert_column(name, value)
else:
type = pyarrow.infer_type([value])
# Scalar value - use original fill_column logic
if isinstance(value, pyarrow.Scalar):
type = value.type
else:
type = pyarrow.infer_type([value])

array = pyarrow.nulls(len(self._table), type=type)
array = pc.fill_null(array, value)
return self._table.append_column(name, array)
array = pyarrow.nulls(len(self._table), type=type)
array = pc.fill_null(array, value)
return self._table.append_column(name, array)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inconsistent Column Overwrite Behavior

The ArrowBlockAccessor.fill_column method behaves inconsistently when a column already exists. It correctly uses upsert_column for array-like values, allowing overwrites. However, for scalar values, it still calls append_column, which fails if the column already exists. This prevents overwriting existing columns with scalar values, impacting operations like with_column and differing from the pandas backend.

Fix in Cursor Fix in Web


@classmethod
def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor":
Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,12 @@ def column_names(self) -> List[str]:
return self._table.columns.tolist()

def fill_column(self, name: str, value: Any) -> Block:
assert name not in self._table.columns

# Check if value is array-like - if so, use upsert_column logic
if isinstance(
value, (pd.Series, np.ndarray, pyarrow.Array, pyarrow.ChunkedArray)
):
return self.upsert_column(name, value)
# Scalar value - use original fill_column logic
return self._table.assign(**{name: value})

@staticmethod
Expand Down
8 changes: 6 additions & 2 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,14 @@ def _project_block(block: Block) -> Block:
# Add/update with expression results
result_block = block
for name, expr in exprs.items():
# Use expr.name if available, otherwise fall back to the dict key name
actual_name = expr.name if expr.name is not None else name
result = eval_expr(expr, result_block)
result_block_accessor = BlockAccessor.for_block(result_block)
result_block = result_block_accessor.upsert_column(name, result)

# fill_column handles both scalars and arrays
result_block = result_block_accessor.fill_column(
actual_name, result
)
block = result_block

# 2. (optional) column projection
Expand Down
62 changes: 60 additions & 2 deletions python/ray/data/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ class Expr(ABC):

data_type: DataType

@property
def name(self) -> str | None:
"""Get the name associated with this expression.

Returns:
The name for expressions that have one (ColumnExpr, AliasExpr),
None otherwise.
"""
return None

@abstractmethod
def structurally_equals(self, other: Any) -> bool:
"""Compare two expression ASTs for structural equality."""
Expand Down Expand Up @@ -208,6 +218,27 @@ def not_in(self, values: Union[List[Any], "Expr"]) -> "Expr":
values = LiteralExpr(values)
return self._bin(values, Operation.NOT_IN)

def alias(self, name: str) -> "Expr":
"""Rename the expression.

This method allows you to assign a new name to an expression result.
This is particularly useful when you want to specify the output column name
directly within the expression rather than as a separate parameter.

Args:
name: The new name for the expression

Returns:
An AliasExpr that wraps this expression with the specified name

Example:
>>> from ray.data.expressions import col, lit
>>> # Create an expression with a new aliased name
>>> expr = (col("price") * col("quantity")).alias("total")
>>> # Can be used with Dataset operations that support named expressions
"""
return AliasExpr(data_type=self.data_type, expr=self, _name=name)


@DeveloperAPI(stability="alpha")
@dataclass(frozen=True, eq=False)
Expand All @@ -227,9 +258,14 @@ class ColumnExpr(Expr):
>>> age_expr = col("age") # Creates ColumnExpr(name="age")
"""

name: str
_name: str
data_type: DataType = field(default_factory=lambda: DataType(object), init=False)

@property
def name(self) -> str:
"""Get the column name."""
return self._name

def structurally_equals(self, other: Any) -> bool:
return isinstance(other, ColumnExpr) and self.name == other.name

Expand Down Expand Up @@ -498,6 +534,27 @@ def structurally_equals(self, other: Any) -> bool:
)


@DeveloperAPI(stability="alpha")
@dataclass(frozen=True, eq=False)
class AliasExpr(Expr):
"""Expression that represents an alias for an expression."""

expr: Expr
_name: str

@property
def name(self) -> str:
"""Get the alias name."""
return self._name

def structurally_equals(self, other: Any) -> bool:
return (
isinstance(other, AliasExpr)
and self.expr.structurally_equals(other.expr)
and self.name == other.name
)


@PublicAPI(stability="beta")
def col(name: str) -> ColumnExpr:
"""
Expand Down Expand Up @@ -603,8 +660,9 @@ def download(uri_column_name: str) -> DownloadExpr:
"BinaryExpr",
"UnaryExpr",
"UDFExpr",
"udf",
"DownloadExpr",
"AliasExpr",
"udf",
"col",
"lit",
"download",
Expand Down
55 changes: 55 additions & 0 deletions python/ray/data/tests/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,64 @@
# Commutative operations are not structurally equal
(col("a") + col("b"), col("b") + col("a"), False),
(lit(1) * col("c"), col("c") * lit(1), False),
# Alias expression tests
(col("a").alias("b"), col("a").alias("b"), True),
(col("a").alias("b"), col("a").alias("c"), False), # Different alias
(col("a").alias("b"), col("b").alias("b"), False), # Different column
((col("a") + 1).alias("result"), (col("a") + 1).alias("result"), True),
(
(col("a") + 1).alias("result"),
(col("a") + 2).alias("result"),
False,
), # Different expr
(col("a").alias("b"), col("a"), False), # Alias vs non-alias
]


@pytest.mark.parametrize(
"expr, alias_name, expected_alias",
[
# (expression, alias_name, expected_alias)
(col("price"), "product_price", "product_price"),
(lit(42), "answer", "answer"),
(col("a") + col("b"), "sum", "sum"),
((col("price") * col("qty")) + lit(5), "total_with_fee", "total_with_fee"),
(col("age") >= lit(18), "is_adult", "is_adult"),
],
ids=["col_alias", "lit_alias", "binary_alias", "complex_alias", "comparison_alias"],
)
def test_alias_functionality(expr, alias_name, expected_alias):
"""Test alias functionality with various expression types."""
import pandas as pd

from ray.data._expression_evaluator import eval_expr

# Test alias creation
aliased_expr = expr.alias(alias_name)
assert aliased_expr.name == expected_alias
assert aliased_expr.expr.structurally_equals(expr)

# Test data type preservation
assert aliased_expr.data_type == expr.data_type

# Test evaluation equivalence
test_data = pd.DataFrame(
{
"price": [10, 20],
"qty": [2, 3],
"a": [1, 2],
"b": [3, 4],
"age": [17, 25],
}
)
original_result = eval_expr(expr, test_data)
aliased_result = eval_expr(aliased_expr, test_data)
if hasattr(original_result, "equals"): # For pandas Series
assert original_result.equals(aliased_result)
else: # For scalars
assert original_result == aliased_result


@pytest.mark.parametrize(
"expr1, expr2, expected",
STRUCTURAL_EQUALITY_TEST_CASES,
Expand Down
85 changes: 85 additions & 0 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -3248,6 +3248,91 @@ def test_with_column_filter_in_pipeline(ray_start_regular_shared):
pd.testing.assert_frame_equal(result_df, expected_df, check_dtype=False)


@pytest.mark.parametrize(
"expr_factory, expected_columns, alias_name, expected_values",
[
(
lambda: col("id").alias("new_id"),
["id", "new_id"],
"new_id",
[0, 1, 2, 3, 4], # Copy of id column
),
(
lambda: (col("id") + 1).alias("id_plus_one"),
["id", "id_plus_one"],
"id_plus_one",
[1, 2, 3, 4, 5], # id + 1
),
(
lambda: (col("id") * 2 + 5).alias("transformed"),
["id", "transformed"],
"transformed",
[5, 7, 9, 11, 13], # id * 2 + 5
),
(
lambda: lit(42).alias("constant"),
["id", "constant"],
"constant",
[42, 42, 42, 42, 42], # lit(42)
),
(
lambda: (col("id") >= 0).alias("is_non_negative"),
["id", "is_non_negative"],
"is_non_negative",
[True, True, True, True, True], # id >= 0
),
(
lambda: (col("id") + 1).alias("id"),
["id"], # Only one column since we're overwriting id
"id",
[1, 2, 3, 4, 5], # id + 1 replaces original id
),
],
ids=[
"col_alias",
"arithmetic_alias",
"complex_alias",
"literal_alias",
"comparison_alias",
"overwrite_existing_column",
],
)
def test_with_column_alias_expressions(
ray_start_regular_shared,
expr_factory,
expected_columns,
alias_name,
expected_values,
):
"""Test that alias expressions work correctly with with_column."""
expr = expr_factory()

# Verify the alias name matches what we expect
assert expr.name == alias_name

# Apply the aliased expression
ds = ray.data.range(5).with_column(alias_name, expr)

# Convert to pandas for comprehensive comparison
result_df = ds.to_pandas()

# Create expected DataFrame
expected_df = pd.DataFrame({"id": [0, 1, 2, 3, 4], alias_name: expected_values})

# Ensure column order matches expected_columns
expected_df = expected_df[expected_columns]

# Assert the entire DataFrame is equal
pd.testing.assert_frame_equal(result_df, expected_df)
# Verify the alias expression evaluates the same as the non-aliased version
non_aliased_expr = expr
ds_non_aliased = ray.data.range(5).with_column(alias_name, non_aliased_expr)

non_aliased_df = ds_non_aliased.to_pandas()

pd.testing.assert_frame_equal(result_df, non_aliased_df)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test Fails Due to Self-Comparison

The test_with_column_alias_expressions intends to compare aliased and non-aliased expressions. However, non_aliased_expr = expr assigns the already aliased expression, leading to a self-comparison. This makes the assertion redundant and the test's stated goal unfulfilled.

Fix in Cursor Fix in Web



if __name__ == "__main__":
import sys

Expand Down