diff --git a/python/ray/data/_expression_evaluator.py b/python/ray/data/_expression_evaluator.py index 6e177e909fa6..506d3a523cbc 100644 --- a/python/ray/data/_expression_evaluator.py +++ b/python/ray/data/_expression_evaluator.py @@ -10,6 +10,7 @@ from ray.data.block import DataBatch from ray.data.expressions import ( + AliasExpr, BinaryExpr, ColumnExpr, Expr, @@ -165,6 +166,10 @@ def _eval_expr_recursive( return result + if isinstance(expr, AliasExpr): + # The renaming of the column is handled in the project op planner stage. + return _eval_expr_recursive(expr.expr, batch, ops) + raise TypeError(f"Unsupported expression node: {type(expr).__name__}") diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 21866f976b07..22a872c22fe1 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -206,18 +206,21 @@ def column_names(self) -> List[str]: return self._table.column_names def fill_column(self, name: str, value: Any) -> Block: - assert name not in self._table.column_names - import pyarrow.compute as pc - if isinstance(value, pyarrow.Scalar): - type = value.type + # Check if value is array-like - if so, use upsert_column logic + if isinstance(value, (pyarrow.Array, pyarrow.ChunkedArray)): + return self.upsert_column(name, value) else: - type = pyarrow.infer_type([value]) + # Scalar value - use original fill_column logic + if isinstance(value, pyarrow.Scalar): + type = value.type + else: + type = pyarrow.infer_type([value]) - array = pyarrow.nulls(len(self._table), type=type) - array = pc.fill_null(array, value) - return self._table.append_column(name, array) + array = pyarrow.nulls(len(self._table), type=type) + array = pc.fill_null(array, value) + return self._table.append_column(name, array) @classmethod def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor": diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index fd43b450b7bd..683bd70055cc 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -289,8 +289,10 @@ def column_names(self) -> List[str]: return self._table.columns.tolist() def fill_column(self, name: str, value: Any) -> Block: - assert name not in self._table.columns - + # Check if value is array-like - if so, use upsert_column logic + if isinstance(value, (pd.Series, np.ndarray)): + return self.upsert_column(name, value) + # Scalar value - use original fill_column logic return self._table.assign(**{name: value}) @staticmethod diff --git a/python/ray/data/_internal/planner/plan_udf_map_op.py b/python/ray/data/_internal/planner/plan_udf_map_op.py index adc8d3157471..6f9dcc5a885f 100644 --- a/python/ray/data/_internal/planner/plan_udf_map_op.py +++ b/python/ray/data/_internal/planner/plan_udf_map_op.py @@ -130,10 +130,14 @@ def _project_block(block: Block) -> Block: # Add/update with expression results result_block = block for name, expr in exprs.items(): + # Use expr.name if available, otherwise fall back to the dict key name + actual_name = expr.name if expr.name is not None else name result = eval_expr(expr, result_block) result_block_accessor = BlockAccessor.for_block(result_block) - result_block = result_block_accessor.upsert_column(name, result) - + # fill_column handles both scalars and arrays + result_block = result_block_accessor.fill_column( + actual_name, result + ) block = result_block # 2. (optional) column projection diff --git a/python/ray/data/expressions.py b/python/ray/data/expressions.py index 2bdd358c0515..a67e41f9dff9 100644 --- a/python/ray/data/expressions.py +++ b/python/ray/data/expressions.py @@ -86,6 +86,16 @@ class Expr(ABC): data_type: DataType + @property + def name(self) -> str | None: + """Get the name associated with this expression. + + Returns: + The name for expressions that have one (ColumnExpr, AliasExpr), + None otherwise. + """ + return None + @abstractmethod def structurally_equals(self, other: Any) -> bool: """Compare two expression ASTs for structural equality.""" @@ -208,6 +218,27 @@ def not_in(self, values: Union[List[Any], "Expr"]) -> "Expr": values = LiteralExpr(values) return self._bin(values, Operation.NOT_IN) + def alias(self, name: str) -> "Expr": + """Rename the expression. + + This method allows you to assign a new name to an expression result. + This is particularly useful when you want to specify the output column name + directly within the expression rather than as a separate parameter. + + Args: + name: The new name for the expression + + Returns: + An AliasExpr that wraps this expression with the specified name + + Example: + >>> from ray.data.expressions import col, lit + >>> # Create an expression with a new aliased name + >>> expr = (col("price") * col("quantity")).alias("total") + >>> # Can be used with Dataset operations that support named expressions + """ + return AliasExpr(data_type=self.data_type, expr=self, _name=name) + @DeveloperAPI(stability="alpha") @dataclass(frozen=True, eq=False) @@ -227,9 +258,14 @@ class ColumnExpr(Expr): >>> age_expr = col("age") # Creates ColumnExpr(name="age") """ - name: str + _name: str data_type: DataType = field(default_factory=lambda: DataType(object), init=False) + @property + def name(self) -> str: + """Get the column name.""" + return self._name + def structurally_equals(self, other: Any) -> bool: return isinstance(other, ColumnExpr) and self.name == other.name @@ -498,6 +534,27 @@ def structurally_equals(self, other: Any) -> bool: ) +@DeveloperAPI(stability="alpha") +@dataclass(frozen=True, eq=False) +class AliasExpr(Expr): + """Expression that represents an alias for an expression.""" + + expr: Expr + _name: str + + @property + def name(self) -> str: + """Get the alias name.""" + return self._name + + def structurally_equals(self, other: Any) -> bool: + return ( + isinstance(other, AliasExpr) + and self.expr.structurally_equals(other.expr) + and self.name == other.name + ) + + @PublicAPI(stability="beta") def col(name: str) -> ColumnExpr: """ @@ -603,8 +660,9 @@ def download(uri_column_name: str) -> DownloadExpr: "BinaryExpr", "UnaryExpr", "UDFExpr", - "udf", "DownloadExpr", + "AliasExpr", + "udf", "col", "lit", "download", diff --git a/python/ray/data/tests/test_expressions.py b/python/ray/data/tests/test_expressions.py index 815ab4465352..087daeba93b2 100644 --- a/python/ray/data/tests/test_expressions.py +++ b/python/ray/data/tests/test_expressions.py @@ -34,9 +34,64 @@ # Commutative operations are not structurally equal (col("a") + col("b"), col("b") + col("a"), False), (lit(1) * col("c"), col("c") * lit(1), False), + # Alias expression tests + (col("a").alias("b"), col("a").alias("b"), True), + (col("a").alias("b"), col("a").alias("c"), False), # Different alias + (col("a").alias("b"), col("b").alias("b"), False), # Different column + ((col("a") + 1).alias("result"), (col("a") + 1).alias("result"), True), + ( + (col("a") + 1).alias("result"), + (col("a") + 2).alias("result"), + False, + ), # Different expr + (col("a").alias("b"), col("a"), False), # Alias vs non-alias ] +@pytest.mark.parametrize( + "expr, alias_name, expected_alias", + [ + # (expression, alias_name, expected_alias) + (col("price"), "product_price", "product_price"), + (lit(42), "answer", "answer"), + (col("a") + col("b"), "sum", "sum"), + ((col("price") * col("qty")) + lit(5), "total_with_fee", "total_with_fee"), + (col("age") >= lit(18), "is_adult", "is_adult"), + ], + ids=["col_alias", "lit_alias", "binary_alias", "complex_alias", "comparison_alias"], +) +def test_alias_functionality(expr, alias_name, expected_alias): + """Test alias functionality with various expression types.""" + import pandas as pd + + from ray.data._expression_evaluator import eval_expr + + # Test alias creation + aliased_expr = expr.alias(alias_name) + assert aliased_expr.name == expected_alias + assert aliased_expr.expr.structurally_equals(expr) + + # Test data type preservation + assert aliased_expr.data_type == expr.data_type + + # Test evaluation equivalence + test_data = pd.DataFrame( + { + "price": [10, 20], + "qty": [2, 3], + "a": [1, 2], + "b": [3, 4], + "age": [17, 25], + } + ) + original_result = eval_expr(expr, test_data) + aliased_result = eval_expr(aliased_expr, test_data) + if hasattr(original_result, "equals"): # For pandas Series + assert original_result.equals(aliased_result) + else: # For scalars + assert original_result == aliased_result + + @pytest.mark.parametrize( "expr1, expr2, expected", STRUCTURAL_EQUALITY_TEST_CASES, diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 52383443f253..6e3889326d76 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -3248,6 +3248,91 @@ def test_with_column_filter_in_pipeline(ray_start_regular_shared): pd.testing.assert_frame_equal(result_df, expected_df, check_dtype=False) +@pytest.mark.parametrize( + "expr_factory, expected_columns, alias_name, expected_values", + [ + ( + lambda: col("id").alias("new_id"), + ["id", "new_id"], + "new_id", + [0, 1, 2, 3, 4], # Copy of id column + ), + ( + lambda: (col("id") + 1).alias("id_plus_one"), + ["id", "id_plus_one"], + "id_plus_one", + [1, 2, 3, 4, 5], # id + 1 + ), + ( + lambda: (col("id") * 2 + 5).alias("transformed"), + ["id", "transformed"], + "transformed", + [5, 7, 9, 11, 13], # id * 2 + 5 + ), + ( + lambda: lit(42).alias("constant"), + ["id", "constant"], + "constant", + [42, 42, 42, 42, 42], # lit(42) + ), + ( + lambda: (col("id") >= 0).alias("is_non_negative"), + ["id", "is_non_negative"], + "is_non_negative", + [True, True, True, True, True], # id >= 0 + ), + ( + lambda: (col("id") + 1).alias("id"), + ["id"], # Only one column since we're overwriting id + "id", + [1, 2, 3, 4, 5], # id + 1 replaces original id + ), + ], + ids=[ + "col_alias", + "arithmetic_alias", + "complex_alias", + "literal_alias", + "comparison_alias", + "overwrite_existing_column", + ], +) +def test_with_column_alias_expressions( + ray_start_regular_shared, + expr_factory, + expected_columns, + alias_name, + expected_values, +): + """Test that alias expressions work correctly with with_column.""" + expr = expr_factory() + + # Verify the alias name matches what we expect + assert expr.name == alias_name + + # Apply the aliased expression + ds = ray.data.range(5).with_column(alias_name, expr) + + # Convert to pandas for comprehensive comparison + result_df = ds.to_pandas() + + # Create expected DataFrame + expected_df = pd.DataFrame({"id": [0, 1, 2, 3, 4], alias_name: expected_values}) + + # Ensure column order matches expected_columns + expected_df = expected_df[expected_columns] + + # Assert the entire DataFrame is equal + pd.testing.assert_frame_equal(result_df, expected_df) + # Verify the alias expression evaluates the same as the non-aliased version + non_aliased_expr = expr + ds_non_aliased = ray.data.range(5).with_column(alias_name, non_aliased_expr) + + non_aliased_df = ds_non_aliased.to_pandas() + + pd.testing.assert_frame_equal(result_df, non_aliased_df) + + if __name__ == "__main__": import sys