Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions databricks/koalas/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pyspark
from pyspark import StorageLevel
from pyspark.sql import Column
from pyspark.sql import DataFrame as SparkDataFrame

if TYPE_CHECKING:
import databricks.koalas as ks
Expand Down Expand Up @@ -677,6 +678,69 @@ def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None):
else:
self._kdf._internal.to_internal_spark_frame.explain(extended, mode)

def apply(self, func, index_col=None):
"""
Applies a function that takes and returns a Spark DataFrame. It allows natively
apply a Spark function and column APIs with the Spark column internally used
in Series or Index.

.. note:: set `index_col` and keep the column named as so in the output Spark
DataFrame to avoid using the default index to prevent performance penalty.
If you omit `index_col`, it will use default index which is potentially
expensive in general.

.. note:: it will lose column labels. This is a synonym of
``func(kdf.to_spark(index_col)).to_koalas(index_col)``.

Parameters
----------
func : function
Function to apply the function against the data by using Spark DataFrame.

Returns
-------
DataFrame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my opinion, how about Koalas DataFrame or ks.DataFrame rather just DataFrame because we're describing this functions as "Applies a function that takes and returns a Spark DataFrame" ??
I think maybe It can be confused whether the return type is Spark DataFrame or Koalas DataFrame.


Raises
------
ValueError : If the output from the function is not a Spark DataFrame.

Examples
--------
>>> from databricks import koalas as ks
>>> kdf = ks.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, columns=["a", "b"])
>>> kdf
a b
0 1 4
1 2 5
2 3 6

>>> kdf.spark.apply(
... lambda sdf: sdf.selectExpr("a + b as c", "index"), index_col="index")
... # doctest: +NORMALIZE_WHITESPACE
c
index
0 5
1 7
2 9

The case below ends up with using the default index, which should be avoided
if possible.

>>> kdf.spark.apply(lambda sdf: sdf.groupby("a").count().sort("a"))
a count
0 1 1
1 2 1
2 3 1
"""
output = func(self.frame(index_col))
if not isinstance(output, SparkDataFrame):
raise ValueError(
"The output of the function [%s] should be of a "
"pyspark.sql.DataFrame; however, got [%s]." % (func, type(output))
)
return output.to_koalas(index_col)


class CachedSparkFrameMethods(SparkFrameMethods):
"""Spark related features for cached DataFrame. This is usually created via
Expand Down
25 changes: 25 additions & 0 deletions databricks/koalas/tests/test_frame_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (C) 2019 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from databricks import koalas as ks
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils


class SparkFrameMethodsTest(ReusedSQLTestCase, SQLTestUtils):
def test_frame_apply_negative(self):
with self.assertRaisesRegex(
ValueError, "The output of the function.* pyspark.sql.DataFrame.*int"
):
ks.range(10).spark.apply(lambda scol: 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move relevant test cases into here in a separate PR, e.g.) print_schema is in test_dataframe.py.

1 change: 1 addition & 0 deletions docs/source/reference/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ in Spark. These can be accessed by ``DataFrame.spark.<function/property>``.
DataFrame.spark.to_table
DataFrame.spark.to_spark_io
DataFrame.spark.explain
DataFrame.spark.apply

Plotting
-------------------------------
Expand Down