-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[Data] [2/n] - Add predicate expression support for dataset.filter #56716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
8db6b11
c664348
27be0de
096051c
2d84e77
d250215
7ff4f9e
7758d67
735faa6
b85fb1a
9884ff8
1f6872c
86fb533
633d5e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| U, | ||
| ) | ||
| from ray.data.context import DataContext | ||
| from ray.data.expressions import Expr | ||
|
|
||
| if TYPE_CHECKING: | ||
| import pandas | ||
|
|
@@ -609,3 +610,13 @@ def iter_rows( | |
| yield row.as_pydict() | ||
| else: | ||
| yield row | ||
|
|
||
| def filter(self, predicate_expr: "Expr") -> "pandas.DataFrame": | ||
| """Filter rows based on a predicate expression.""" | ||
| from ray.data._expression_evaluator import eval_expr | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll make this a TODO just to keep the change cleaner
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, let's do in a follow-up. But let's do it right away |
||
|
|
||
| # Evaluate the expression to get a boolean mask | ||
| mask = eval_expr(predicate_expr, self._table) | ||
|
|
||
| # Use pandas boolean indexing | ||
| return self._table[mask] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -215,6 +215,7 @@ def plan_filter_op( | |
| ) | ||
|
|
||
| expression = op._filter_expr | ||
| predicate_expr = op._predicate_expr | ||
| compute = get_compute(op._compute) | ||
| if expression is not None: | ||
|
|
||
|
|
@@ -234,6 +235,26 @@ def filter_batch_fn(block: "pa.Table") -> "pa.Table": | |
| output_block_size_option=output_block_size_option, | ||
| ) | ||
|
|
||
| elif predicate_expr is not None: | ||
| # Ray Data expression path using BlockAccessor | ||
| def filter_block_fn(block: Block) -> Block: | ||
| try: | ||
| block_accessor = BlockAccessor.for_block(block) | ||
| if not block_accessor.num_rows(): | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return block | ||
| return block_accessor.filter(predicate_expr) | ||
|
|
||
| except Exception as e: | ||
| _try_wrap_udf_exception(e) | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| transform_fn = BatchMapTransformFn( | ||
| _generate_transform_fn_for_map_batches(filter_block_fn), | ||
| batch_size=None, | ||
| batch_format=BatchFormat.ARROW, | ||
| zero_copy_batch=True, | ||
| is_udf=True, | ||
| output_block_size_option=output_block_size_option, | ||
| ) | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Import Error and Inconsistent Exception HandlingThe
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... Ofc it's imported... |
||
| else: | ||
| udf_is_callable_class = isinstance(op._fn, CallableClass) | ||
| filter_fn, init_fn = _get_udf( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.