Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from pandas.core.dtypes.common import _get_dtype_from_object as infer_dtype_from_object
from pandas.core.accessor import CachedAccessor
from pandas.core.dtypes.inference import is_sequence
from pyspark import StorageLevel
from pyspark import sql as spark
from pyspark.sql import functions as F, Column
from pyspark.sql.functions import pandas_udf
Expand Down Expand Up @@ -3899,6 +3900,12 @@ def cache(self):
The Koalas DataFrame is yielded as a protected resource and its corresponding
data is cached which gets uncached after execution goes of the context.

If you want to specify the StorageLevel manually, use :meth:`DataFrame.persist`

See Also
--------
DataFrame.persist

Examples
--------
>>> df = ks.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
Expand Down Expand Up @@ -3931,6 +3938,71 @@ def cache(self):
"""
return _CachedDataFrame(self._internal)

def persist(self, storage_level=StorageLevel.MEMORY_AND_DISK):
"""
Yields and caches the current DataFrame with a specific StorageLevel.
If a StogeLevel is not given, the `MEMORY_AND_DISK` level is used by default like PySpark.

The Koalas DataFrame is yielded as a protected resource and its corresponding
data is cached which gets uncached after execution goes of the context.

See Also
--------
DataFrame.cache

Examples
--------
>>> import pyspark
>>> df = ks.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
... columns=['dogs', 'cats'])
>>> df
dogs cats
0 0.2 0.3
1 0.0 0.6
2 0.6 0.0
3 0.2 0.1

Set the StorageLevel to `MEMORY_ONLY`.

>>> with df.persist(pyspark.StorageLevel.MEMORY_ONLY) as cached_df:
... print(cached_df.count())
...
dogs 4
cats 4
Name: 0, dtype: int64

Set the StorageLevel to `DISK_ONLY`.

>>> with df.persist(pyspark.StorageLevel.DISK_ONLY) as cached_df:
... print(cached_df.count())
...
dogs 4
cats 4
Name: 0, dtype: int64

If a StorageLevel is not given, it uses `MEMORY_AND_DISK` by default.

>>> with df.persist() as cached_df:
... print(cached_df.count())
...
dogs 4
cats 4
Name: 0, dtype: int64

>>> df = df.persist()
>>> df.to_pandas().mean(axis=1)
0 0.25
1 0.30
2 0.30
3 0.15
dtype: float64

To uncache the dataframe, use `unpersist` function

>>> df.unpersist()
"""
return _CachedDataFrame(self._internal, storage_level=storage_level)

def to_table(
self,
name: str,
Expand Down Expand Up @@ -10078,8 +10150,15 @@ class _CachedDataFrame(DataFrame):
it caches the corresponding Spark DataFrame.
"""

def __init__(self, internal):
self._cached = internal._sdf.cache()
def __init__(self, internal, storage_level=None):
if storage_level is None:
self._cached = internal._sdf.cache()
elif isinstance(storage_level, StorageLevel):
self._cached = internal._sdf.persist(storage_level)
else:
raise TypeError(
"Only a valid pyspark.StorageLevel type is acceptable for the `storage_level`"
)
super(_CachedDataFrame, self).__init__(internal)

def __enter__(self):
Expand Down
29 changes: 29 additions & 0 deletions databricks/koalas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import numpy as np
import pandas as pd
import pyspark
from pyspark import StorageLevel
from pyspark.ml.linalg import SparseVector

from databricks import koalas as ks
from databricks.koalas.config import option_context
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils
from databricks.koalas.exceptions import PandasNotImplementedError
from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame
from databricks.koalas.frame import _CachedDataFrame


class DataFrameTest(ReusedSQLTestCase, SQLTestUtils):
Expand Down Expand Up @@ -3133,3 +3135,30 @@ def test_to_markdown(self):
self.assertRaises(NotImplementedError, lambda: kdf.to_markdown())
else:
self.assert_eq(pdf.to_markdown(), kdf.to_markdown())

def test_cache(self):
pdf = pd.DataFrame(
[(0.2, 0.3), (0.0, 0.6), (0.6, 0.0), (0.2, 0.1)], columns=["dogs", "cats"]
)
kdf = ks.from_pandas(pdf)

with kdf.cache() as cached_df:
self.assert_eq(isinstance(cached_df, _CachedDataFrame), True)

def test_persist(self):
pdf = pd.DataFrame(
[(0.2, 0.3), (0.0, 0.6), (0.6, 0.0), (0.2, 0.1)], columns=["dogs", "cats"]
)
kdf = ks.from_pandas(pdf)
storage_levels = [
StorageLevel.DISK_ONLY,
StorageLevel.MEMORY_AND_DISK,
StorageLevel.MEMORY_ONLY,
StorageLevel.OFF_HEAP,
]

for storage_level in storage_levels:
with kdf.persist(storage_level) as cached_df:
self.assert_eq(isinstance(cached_df, _CachedDataFrame), True)

self.assertRaises(TypeError, lambda: kdf.persist("DISK_ONLY"))
1 change: 1 addition & 0 deletions docs/source/reference/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ Cache
:toctree: api/

DataFrame.cache
DataFrame.persist

Serialization / IO / Conversion
-------------------------------
Expand Down