Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions databricks/koalas/spark/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,38 @@ def repartition(self, num_partitions: int) -> "ks.DataFrame":
from databricks.koalas.frame import DataFrame

internal = self._kdf._internal.resolved_copy

repartitioned_sdf = internal.spark_frame.repartition(num_partitions)

return DataFrame(internal.with_new_sdf(repartitioned_sdf))

def coalesce(self, num_partitions: int) -> "ks.DataFrame":
"""
Returns a new DataFrame that has exactly `num_partitions` partitions.

.. note:: This operation results in a narrow dependency, e.g. if you go from 1000
partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new
partitions will claim 10 of the current partitions. If a larger number of partitions is
requested, it will stay at the current number of partitions. However, if you're doing a
drastic coalesce, e.g. to num_partitions = 1, this may result in your computation taking
place on fewer nodes than you like (e.g. one node in the case of num_partitions = 1). To
avoid this, you can call repartition(). This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever the current
partitioning is).

Parameters
----------
num_partitions : int
The target number of partitions.

Returns
-------
DataFrame
"""
from databricks.koalas.frame import DataFrame

internal = self._kdf._internal.resolved_copy
coalesced_sdf = internal.spark_frame.coalesce(num_partitions)
return DataFrame(internal.with_new_sdf(coalesced_sdf))

@property
def analyzed(self) -> "ks.DataFrame":
"""
Expand Down
29 changes: 29 additions & 0 deletions databricks/koalas/tests/test_frame_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,32 @@ def test_repartition(self):
new_kdf = kdf.spark.repartition(num_partitions)
self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions)
self.assert_eq(kdf.sort_index(), new_kdf.sort_index())

def test_coalesce(self):
num_partitions = 10
kdf = ks.DataFrame({"age": [5, 5, 2, 2], "name": ["Bob", "Bob", "Alice", "Alice"]})
kdf = kdf.spark.repartition(num_partitions)

new_kdf = kdf.spark.coalesce(--num_partitions)
self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions)
self.assert_eq(kdf.sort_index(), new_kdf.sort_index())

# Reserves Index
kdf = kdf.set_index("age")
new_kdf = kdf.spark.coalesce(--num_partitions)
self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions)
self.assert_eq(kdf.sort_index(), new_kdf.sort_index())

# Reflects internal changes
kdf = kdf.reset_index()
kdf = kdf.set_index("name")
kdf2 = kdf + 1
self.assert_eq(kdf2.sort_index(), (kdf + 1).spark.coalesce(--num_partitions).sort_index())

# Reserves MultiIndex
kdf = ks.DataFrame({"a": ["a", "b", "c"]}, index=[[1, 2, 3], [4, 5, 6]])
kdf = kdf.spark.repartition(--num_partitions)

new_kdf = kdf.spark.coalesce(--num_partitions)
self.assertEqual(new_kdf.to_spark().rdd.getNumPartitions(), num_partitions)
self.assert_eq(kdf.sort_index(), new_kdf.sort_index())
1 change: 1 addition & 0 deletions docs/source/reference/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ in Spark. These can be accessed by ``DataFrame.spark.<function/property>``.
DataFrame.spark.explain
DataFrame.spark.apply
DataFrame.spark.repartition
DataFrame.spark.coalesce

.. _api.dataframe.plot:

Expand Down