-
-
Notifications
You must be signed in to change notification settings - Fork 29
Add DataFusion datasource implementation in Python for pandas and DataFrame Interchange #438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| @pytest.mark.skipif(pa_major_minor < (11, 0), reason="pyarrow 11+ required") | ||
| @pytest.mark.parametrize("connection", get_connections()) | ||
| def test_gh_286(connection): | ||
| def test_gh_286(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing lost in this PR is the ability to query DataFrame Interchange Protocol objects with the DuckDB connection. This was needed (under the current architecture) to avoid converting to arrow up front.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is copied from https://data-apis.org/dataframe-protocol/latest/API.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the Python interface that the Rust logic calls into
| from ._dfi_types import DtypeKind, DataFrame as DfiDataFrame | ||
| from .datasource import Datasource | ||
|
|
||
| # Taken from private pyarrow utilities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied some private utilities from pyarrow that handle converting DataFrame Interchange Protocol types to pyarrow schema types.
| columns = list(columns) | ||
| projected_schema = pa.schema([f for f in self._schema if f.name in columns]) | ||
| table = from_dataframe(self._dataframe.select_columns_by_name(columns)) | ||
| return table.cast(projected_schema, safe=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that this cast was needed to handle the case where polars returns a LargeUTF8 column since the converted pyarrow types are never LargeUTF8.
| let result_updates = py.allow_threads(|| { | ||
| self.tokio_runtime | ||
| .block_on(self.state.update(&self.runtime, updates)) | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was needed to avoid a deadlock now that the DataFusion datasource may need to acquire the GIL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is adapted from the DataFusion custom datasource example: https://github.com/apache/arrow-datafusion/blob/47fd9bf5b7a1b931e6e8bd323a01ae54fda261e5/datafusion-examples/examples/custom_datasource.rs
Closes #386
This PR adds custom DataFusion datasources, written in Python, for pandas DataFrames and objects that adhere to the DataFrame Interchange Protocol (i.e. have a
__dataframe__method). Using this approach, we no longer convert these objects directly to arrow before passing them into VegaFusion. Instead, they are converted to arrow dynamically during the DataFusion query. This makes it possible to down select to the required columns before converting to Arrow, which can be much much faster for DataFrames that include lots of columns.Example of 10 million row histogram with pandas:
10243200
Test performing with chart.to_dict()
Before: 7.04 s ± 81.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
After: 336 ms ± 11.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
That's 20x faster!
This is because we're only converting 1 out of 17 columns to arrow, and skipping several string columns which are particularly slow to convert.
The duckdb connection against pandas is still a bit faster, but the DataFusion connection is now within a factor of 2 for this example.
DuckDB: 199 ms ± 2.92 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
The performance results for DataFrame Interchange Protocol objects will vary depending on how expensive it is to convert their contents to arrow using PyArrow. In this case we're using
dfi.select_columns_by_name(columns)to filter down the source columns before converting to Arrow.cc @ivirshup