Skip to content

Commit 3cbabc2

Browse files
committed
add filter pushdown docs
1 parent c2d3506 commit 3cbabc2

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

python/docs/source/user_guide/sql/python_data_source.rst

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,121 @@ The following example demonstrates how to implement a basic Data Source using Ar
530530
531531
df.show()
532532
533+
Filter Pushdown in Python Data Sources
534+
--------------------------------------
535+
536+
Filter pushdown is an optimization technique that allows data sources to handle filters natively, reducing the amount of data that needs to be transferred and processed by Spark.
537+
538+
The filter pushdown API is introduced in Spark 4.1, enabling DataSourceReader to selectively push down filters from the query to the source.
539+
540+
You must turn on the configuration ``spark.sql.python.filterPushdown.enabled`` to enable filter pushdown.
541+
542+
**How Filter Pushdown Works**
543+
544+
When a query includes filter conditions, Spark can pass these filters to the data source implementation, which can then apply the filters during data retrieval. This is especially beneficial for:
545+
546+
- Data sources backed by formats that allow efficient filtering (e.g. key-value stores)
547+
- APIs that support filtering (e.g. REST and GraphQL APIs)
548+
549+
The data source receives the filters, decides which ones can be pushed down, and returns the remaining filters to Spark to be applied later.
550+
551+
**Implementing Filter Pushdown**
552+
553+
To enable filter pushdown in your Python Data Source, implement the ``pushFilters`` method in your ``DataSourceReader`` class:
554+
555+
.. code-block:: python
556+
557+
from pyspark.sql.datasource import EqualTo, Filter, GreaterThan, LessThan
558+
559+
def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]:
560+
"""
561+
Parameters
562+
----------
563+
filters : list of Filter objects
564+
The AND of the filters that Spark would like to push down
565+
566+
Returns
567+
-------
568+
iterable of Filter objects
569+
Filters that could not be pushed down and still need to be
570+
evaluated by Spark
571+
"""
572+
# Process the filters and determine which ones can be handled by the data source
573+
pushed = []
574+
for filter in filters:
575+
if isinstance(filter, (EqualTo, GreaterThan, LessThan)):
576+
pushed.append(filter)
577+
# Check for other supported filter types...
578+
else:
579+
yield filter # Let Spark handle unsupported filters
580+
581+
# Store the pushed filters for use in partitions() and read() methods
582+
self.pushed_filters = pushed
583+
584+
**Notes**
585+
586+
pushFilters() is called only if there are filters available to push down.
587+
If it is called, the call happens before partitions().
588+
589+
**Supported Filter Types**
590+
591+
Spark supports pushing down the following filter types:
592+
593+
.. list-table::
594+
:header-rows: 1
595+
596+
* - Filter Type
597+
- Class
598+
- SQL Equivalent
599+
* - Equality
600+
- ``EqualTo``
601+
- ``column = constant``
602+
* - Greater Than
603+
- ``GreaterThan``
604+
- ``column > constant``
605+
* - Greater Than or Equal
606+
- ``GreaterThanOrEqual``
607+
- ``column >= constant``
608+
* - Less Than
609+
- ``LessThan``
610+
- ``column < constant``
611+
* - Less Than or Equal
612+
- ``LessThanOrEqual``
613+
- ``column <= constant``
614+
* - IN list
615+
- ``In``
616+
- ``column IN (constants)``
617+
* - IS NULL
618+
- ``IsNull``
619+
- ``column IS NULL``
620+
* - IS NOT NULL
621+
- ``IsNotNull``
622+
- ``column IS NOT NULL``
623+
* - String Contains
624+
- ``StringContains``
625+
- ``column LIKE '%constant%'``
626+
* - String Starts With
627+
- ``StringStartsWith``
628+
- ``column LIKE 'constant%'``
629+
* - String Ends With
630+
- ``StringEndsWith``
631+
- ``column LIKE '%constant'``
632+
* - NOT
633+
- ``Not``
634+
- ``NOT filter``
635+
636+
Only supported filters are passed to the ``pushFilters`` method.
637+
638+
**Best Practices**
639+
640+
1. **Handle what you can, pass back what you can't**: Implement pushdown for filter types that your data source can handle efficiently. Return the remaining filters for Spark to process.
641+
642+
2. **Anticipate new filter types**: More filter types may be added in the future, so do not assume that you handled all possible filters in your implementation.
643+
644+
3. **Use pushed filters throughout the execution**: Store pushed filters and respect them in both ``partitions()`` and ``read()``.
645+
646+
4. **Test performance**: Compare query performance with and without filter pushdown to ensure it's providing the expected benefits.
647+
533648
Usage Notes
534649
-----------
535650

python/pyspark/sql/datasource.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,11 @@ def pushFilters(self, filters: List["Filter"]) -> Iterable["Filter"]:
539539
This method is allowed to modify `self`. The object must remain picklable.
540540
Modifications to `self` are visible to the `partitions()` and `read()` methods.
541541
542+
Notes
543+
-----
544+
Configuration `spark.sql.python.filterPushdown.enabled` must be set to `true`
545+
to implement this method.
546+
542547
Examples
543548
--------
544549
Example filters and the resulting arguments passed to pushFilters:

0 commit comments

Comments
 (0)