Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/source/user_guide/best_practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,43 @@ Even though Koalas tries its best to optimize and reduce such shuffle operations
optimizers, it is best to avoid shuffling in the application side whenever possible.


Use checkpoint
--------------

After a bunch of operations on Koalas objects, the underlying Spark planner can slow down due to the huge and complex plan.
If the Spark plan becomes huge or it takes the planning long time, ``DataFrame.spark.checkpoint()``
or ``DataFrame.spark.local_checkpoint()`` would be helpful.

.. code-block:: python

>>> import databricks.koalas as ks
>>> kdf = ks.DataFrame({'id': range(10)})
>>> kdf = kdf[kdf.id > 5]
>>> kdf['id'] = kdf['id'] + (10 * kdf['id'] + kdf['id'])
>>> kdf = kdf.groupby('id').head(2)
>>> kdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
+- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
+- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
+- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
+- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
+- *(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

>>> kdf = kdf.spark.local_checkpoint() # or kdf.spark.checkpoint()
>>> kdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]

As you can see, the previous Spark plan is dropped and starts with a simple plan.
The result of the previous DataFrame is stored in the configured file system when calling ``DataFrame.spark.checkpoint()``,
or in the executor when calling ``DataFrame.spark.local_checkpoint()``.


Avoid shuffling
---------------

Expand Down Expand Up @@ -173,6 +210,7 @@ the index column. Specify the index column whenever possible.

See `working with PySpark <pandas_pyspark.rst#pyspark>`_


Use ``distributed`` or ``distributed-sequence`` default index
-------------------------------------------------------------

Expand Down